|
@@ -89,6 +89,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|
|
import org.elasticsearch.indices.SystemDataStreamDescriptor;
|
|
|
import org.elasticsearch.indices.SystemIndices;
|
|
|
import org.elasticsearch.repositories.FinalizeSnapshotContext;
|
|
|
+import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations;
|
|
|
import org.elasticsearch.repositories.IndexId;
|
|
|
import org.elasticsearch.repositories.RepositoriesService;
|
|
|
import org.elasticsearch.repositories.Repository;
|
|
@@ -469,7 +470,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
endingSnapshots.add(targetSnapshot);
|
|
|
initializingClones.remove(targetSnapshot);
|
|
|
logger.info(() -> "Failed to start snapshot clone [" + cloneEntry + "]", e);
|
|
|
- removeFailedSnapshotFromClusterState(targetSnapshot, e, null, ShardGenerations.EMPTY);
|
|
|
+ removeFailedSnapshotFromClusterState(targetSnapshot, e, null, UpdatedShardGenerations.EMPTY);
|
|
|
};
|
|
|
|
|
|
// 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
|
|
@@ -748,21 +749,28 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
|
|
|
+ private static UpdatedShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
|
|
|
ShardGenerations.Builder builder = ShardGenerations.builder();
|
|
|
+ ShardGenerations.Builder deletedBuilder = null;
|
|
|
if (snapshot.isClone()) {
|
|
|
snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> builder.put(key.index(), key.shardId(), value));
|
|
|
} else {
|
|
|
- snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> {
|
|
|
+ for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> entry : snapshot.shardSnapshotStatusByRepoShardId().entrySet()) {
|
|
|
+ RepositoryShardId key = entry.getKey();
|
|
|
+ ShardSnapshotStatus value = entry.getValue();
|
|
|
final Index index = snapshot.indexByName(key.indexName());
|
|
|
if (metadata.findIndex(index).isEmpty()) {
|
|
|
assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial.";
|
|
|
- return;
|
|
|
+ if (deletedBuilder == null) {
|
|
|
+ deletedBuilder = ShardGenerations.builder();
|
|
|
+ }
|
|
|
+ deletedBuilder.put(key.index(), key.shardId(), value);
|
|
|
+ continue;
|
|
|
}
|
|
|
builder.put(key.index(), key.shardId(), value);
|
|
|
- });
|
|
|
+ }
|
|
|
}
|
|
|
- return builder.build();
|
|
|
+ return new UpdatedShardGenerations(builder.build(), deletedBuilder == null ? ShardGenerations.EMPTY : deletedBuilder.build());
|
|
|
}
|
|
|
|
|
|
private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
|
|
@@ -1360,7 +1368,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
snapshot,
|
|
|
new SnapshotException(snapshot, entry.failure()),
|
|
|
null,
|
|
|
- ShardGenerations.EMPTY
|
|
|
+ UpdatedShardGenerations.EMPTY
|
|
|
);
|
|
|
}
|
|
|
return;
|
|
@@ -1454,8 +1462,9 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
|
|
|
final String failure = entry.failure();
|
|
|
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
|
|
|
- final ShardGenerations shardGenerations = buildGenerations(entry, metadata);
|
|
|
- final List<String> finalIndices = shardGenerations.indices().stream().map(IndexId::getName).toList();
|
|
|
+ final var updatedShardGenerations = buildGenerations(entry, metadata);
|
|
|
+ final ShardGenerations updatedShardGensForLiveIndices = updatedShardGenerations.liveIndices();
|
|
|
+ final List<String> finalIndices = updatedShardGensForLiveIndices.indices().stream().map(IndexId::getName).toList();
|
|
|
final Set<String> indexNames = new HashSet<>(finalIndices);
|
|
|
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
|
|
|
for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> shardStatus : entry.shardSnapshotStatusByRepoShardId().entrySet()) {
|
|
@@ -1552,7 +1561,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
entry.partial() ? onlySuccessfulFeatureStates(entry, finalIndices) : entry.featureStates(),
|
|
|
failure,
|
|
|
threadPool.absoluteTimeInMillis(),
|
|
|
- entry.partial() ? shardGenerations.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(),
|
|
|
+ entry.partial() ? updatedShardGensForLiveIndices.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(),
|
|
|
shardFailures,
|
|
|
entry.includeGlobalState(),
|
|
|
entry.userMetadata(),
|
|
@@ -1562,7 +1571,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
final ListenableFuture<List<ActionListener<SnapshotInfo>>> snapshotListeners = new ListenableFuture<>();
|
|
|
repo.finalizeSnapshot(
|
|
|
new FinalizeSnapshotContext(
|
|
|
- shardGenerations,
|
|
|
+ updatedShardGenerations,
|
|
|
repositoryData.getGenId(),
|
|
|
metaForSnapshot,
|
|
|
snapshotInfo,
|
|
@@ -1579,7 +1588,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
snapshot,
|
|
|
repositoryData,
|
|
|
// we might have written the new root blob before failing here, so we must use the updated shardGenerations
|
|
|
- shardGenerations
|
|
|
+ updatedShardGenerations
|
|
|
)
|
|
|
),
|
|
|
() -> snapshotListeners.addListener(new ActionListener<>() {
|
|
@@ -1604,7 +1613,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
repositoryData,
|
|
|
// a failure here means the root blob was not updated, but the updated shard generation blobs are all in place so we can
|
|
|
// use the updated shardGenerations for all pending shard snapshots
|
|
|
- shardGenerations
|
|
|
+ updatedShardGenerations
|
|
|
)
|
|
|
));
|
|
|
}
|
|
@@ -1613,7 +1622,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
public void onRejection(Exception e) {
|
|
|
if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) {
|
|
|
logger.debug("failing finalization of {} due to shutdown", snapshot);
|
|
|
- handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
|
|
|
+ handleFinalizationFailure(e, snapshot, repositoryData, UpdatedShardGenerations.EMPTY);
|
|
|
} else {
|
|
|
onFailure(e);
|
|
|
}
|
|
@@ -1623,7 +1632,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
public void onFailure(Exception e) {
|
|
|
logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e);
|
|
|
assert false : new AssertionError("unexpected failure finalizing " + snapshot, e);
|
|
|
- handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
|
|
|
+ handleFinalizationFailure(e, snapshot, repositoryData, UpdatedShardGenerations.EMPTY);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1679,7 +1688,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
Exception e,
|
|
|
Snapshot snapshot,
|
|
|
RepositoryData repositoryData,
|
|
|
- ShardGenerations shardGenerations
|
|
|
+ UpdatedShardGenerations updatedShardGenerations
|
|
|
) {
|
|
|
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
|
|
|
// Failure due to not being master any more, don't try to remove snapshot from cluster state the next master
|
|
@@ -1693,7 +1702,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
failAllListenersOnMasterFailOver(e);
|
|
|
} else {
|
|
|
logger.warn(() -> "[" + snapshot + "] failed to finalize snapshot", e);
|
|
|
- removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, shardGenerations);
|
|
|
+ removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, updatedShardGenerations);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1817,7 +1826,11 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
* @param snapshot snapshot for which to remove the snapshot operation
|
|
|
* @return updated cluster state
|
|
|
*/
|
|
|
- public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot, ShardGenerations shardGenerations) {
|
|
|
+ public static ClusterState stateWithoutSnapshot(
|
|
|
+ ClusterState state,
|
|
|
+ Snapshot snapshot,
|
|
|
+ UpdatedShardGenerations updatedShardGenerations
|
|
|
+ ) {
|
|
|
final SnapshotsInProgress inProgressSnapshots = SnapshotsInProgress.get(state);
|
|
|
ClusterState result = state;
|
|
|
int indexOfEntry = -1;
|
|
@@ -1883,7 +1896,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
final RepositoryShardId repositoryShardId = finishedShardEntry.getKey();
|
|
|
if (shardState.state() != ShardState.SUCCESS
|
|
|
|| previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false
|
|
|
- || shardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) {
|
|
|
+ || updatedShardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) {
|
|
|
continue;
|
|
|
}
|
|
|
updatedShardAssignments = maybeAddUpdatedAssignment(
|
|
@@ -1902,7 +1915,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
final ShardSnapshotStatus shardState = finishedShardEntry.getValue();
|
|
|
if (shardState.state() == ShardState.SUCCESS
|
|
|
&& previousEntry.shardSnapshotStatusByRepoShardId().containsKey(finishedShardEntry.getKey())
|
|
|
- && shardGenerations.hasShardGen(finishedShardEntry.getKey())) {
|
|
|
+ && updatedShardGenerations.hasShardGen(finishedShardEntry.getKey())) {
|
|
|
updatedShardAssignments = maybeAddUpdatedAssignment(
|
|
|
updatedShardAssignments,
|
|
|
shardState,
|
|
@@ -1992,14 +2005,14 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
|
|
Snapshot snapshot,
|
|
|
Exception failure,
|
|
|
@Nullable RepositoryData repositoryData,
|
|
|
- ShardGenerations shardGenerations
|
|
|
+ UpdatedShardGenerations updatedShardGenerations
|
|
|
) {
|
|
|
assert failure != null : "Failure must be supplied";
|
|
|
submitUnbatchedTask(REMOVE_SNAPSHOT_METADATA_TASK_SOURCE, new ClusterStateUpdateTask() {
|
|
|
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState currentState) {
|
|
|
- final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, shardGenerations);
|
|
|
+ final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, updatedShardGenerations);
|
|
|
assert updatedState == currentState || endingSnapshots.contains(snapshot)
|
|
|
: "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state";
|
|
|
// now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them
|