1
0
Эх сурвалжийг харах

Remove Snapshot INIT Step (#55918)

With #55773 the snapshot INIT state step has become obsolete. We can set up the snapshot directly in one single step to simplify the state machine.

This is a big help for building concurrent snapshots because it allows us to establish a deterministic order of operations between snapshot create and delete operations since all of their entries now contain a repository generation. With this change simple queuing up of snapshot operations can and will be added in a follow-up.
Armin Braun 5 жил өмнө
parent
commit
f4022c027e

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

@@ -213,7 +213,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
                             l -> blobStoreRepository.cleanup(
                                 repositoryStateId,
                                 snapshotsService.minCompatibleVersion(
-                                    newState.nodes().getMinNodeVersion(), repositoryName, repositoryData, null),
+                                    newState.nodes().getMinNodeVersion(), repositoryData, null),
                                 ActionListener.wrap(result -> after(null, result), e -> after(e, null)))
                         ));
                     }

+ 90 - 271
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -28,7 +28,6 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
 import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -123,9 +122,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
     private final Map<Snapshot, List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>>> snapshotCompletionListeners =
         new ConcurrentHashMap<>();
 
-    // Set of snapshots that are currently being initialized by this node
-    private final Set<Snapshot> initializingSnapshots = Collections.synchronizedSet(new HashSet<>());
-
     // Set of snapshots that are currently being ended by this node
     private final Set<Snapshot> endingSnapshots = Collections.synchronizedSet(new HashSet<>());
 
@@ -169,15 +165,22 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         validate(repositoryName, snapshotName);
         final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
         Repository repository = repositoriesService.repository(request.repository());
+        if (repository.isReadOnly()) {
+            listener.onFailure(
+                    new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"));
+            return;
+        }
+        final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
         final Map<String, Object> userMeta = repository.adaptUserMetadata(request.userMetadata());
-        clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
-
-            private SnapshotsInProgress.Entry newSnapshot = null;
-
-            private List<String> indices;
+        repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask() {
 
             @Override
             public ClusterState execute(ClusterState currentState) {
+                // check if the snapshot name already exists in the repository
+                if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
+                    throw new InvalidSnapshotNameException(
+                            repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
+                }
                 validate(repositoryName, snapshotName, currentState);
                 SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
                 if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
@@ -193,66 +196,71 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 // Fail if there are any concurrently running snapshots. The only exception to this being a snapshot in INIT state from a
                 // previous master that we can simply ignore and remove from the cluster state because we would clean it up from the
                 // cluster state anyway in #applyClusterState.
-                if (snapshots != null && snapshots.entries().stream().anyMatch(entry ->
-                    (entry.state() == State.INIT && initializingSnapshots.contains(entry.snapshot()) == false) == false)) {
+                if (snapshots != null && snapshots.entries().stream().anyMatch(entry -> entry.state() != State.INIT)) {
                     throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
                 }
                 // Store newSnapshot here to be processed in clusterStateProcessed
-                indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
+                List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
                     request.indicesOptions(), request.indices()));
                 logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
-                newSnapshot = new SnapshotsInProgress.Entry(
-                    new Snapshot(repositoryName, snapshotId),
-                    request.includeGlobalState(), request.partial(),
-                    State.INIT,
-                    Collections.emptyList(), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot
-                    threadPool.absoluteTimeInMillis(),
-                    RepositoryData.UNKNOWN_REPO_GEN,
-                    null,
-                    userMeta, Version.CURRENT
-                );
-                initializingSnapshots.add(newSnapshot.snapshot());
-                snapshots = new SnapshotsInProgress(newSnapshot);
-                return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
+
+                final List<IndexId> indexIds = repositoryData.resolveNewIndices(indices);
+                final Version version = minCompatibleVersion(
+                        clusterService.state().nodes().getMinNodeVersion(), repositoryData, null);
+                ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards =
+                        shards(currentState, indexIds, useShardGenerations(version), repositoryData);
+                SnapshotsInProgress.Entry newEntry = null;
+                if (request.partial() == false) {
+                    Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
+                            currentState.metadata());
+                    Set<String> missing = indicesWithMissingShards.v1();
+                    Set<String> closed = indicesWithMissingShards.v2();
+                    if (missing.isEmpty() == false || closed.isEmpty() == false) {
+                        final StringBuilder failureMessage = new StringBuilder();
+                        if (missing.isEmpty() == false) {
+                            failureMessage.append("Indices don't have primary shards ");
+                            failureMessage.append(missing);
+                        }
+                        if (closed.isEmpty() == false) {
+                            if (failureMessage.length() > 0) {
+                                failureMessage.append("; ");
+                            }
+                            failureMessage.append("Indices are closed ");
+                        }
+                        // TODO: We should just throw here instead of creating a FAILED and hence useless snapshot in the repository
+                        newEntry = new SnapshotsInProgress.Entry(
+                                new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), false,
+                                State.FAILED, indexIds, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), shards,
+                                failureMessage.toString(), userMeta, version);
+                    }
+                }
+                if (newEntry == null) {
+                    newEntry = new SnapshotsInProgress.Entry(
+                            new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), request.partial(),
+                            State.STARTED, indexIds, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), shards,
+                            null, userMeta, version);
+                }
+                return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
+                        new SnapshotsInProgress(List.of(newEntry))).build();
             }
 
             @Override
             public void onFailure(String source, Exception e) {
                 logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
-                if (newSnapshot != null) {
-                    initializingSnapshots.remove(newSnapshot.snapshot());
-                }
-                newSnapshot = null;
                 listener.onFailure(e);
             }
 
             @Override
             public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
-                if (newSnapshot != null) {
-                    final Snapshot current = newSnapshot.snapshot();
-                    assert initializingSnapshots.contains(current);
-                    assert indices != null;
-                    beginSnapshot(newState, newSnapshot, request.partial(), indices, repository, new ActionListener<>() {
-                        @Override
-                        public void onResponse(final Snapshot snapshot) {
-                            initializingSnapshots.remove(snapshot);
-                            listener.onResponse(snapshot);
-                        }
-
-                        @Override
-                        public void onFailure(final Exception e) {
-                            initializingSnapshots.remove(current);
-                            listener.onFailure(e);
-                        }
-                    });
-                }
+                logger.info("snapshot [{}] started", snapshot);
+                listener.onResponse(snapshot);
             }
 
             @Override
             public TimeValue timeout() {
                 return request.masterNodeTimeout();
             }
-        });
+        }, "create_snapshot [" + snapshotName + ']', listener::onFailure);
     }
 
     /**
@@ -296,172 +304,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         }
     }
 
-    /**
-     * Starts snapshot.
-     * <p>
-     * Creates snapshot in repository and updates snapshot metadata record with list of shards that needs to be processed.
-     *
-     * @param clusterState               cluster state
-     * @param snapshot                   snapshot meta data
-     * @param partial                    allow partial snapshots
-     * @param userCreateSnapshotListener listener
-     */
-    private void beginSnapshot(final ClusterState clusterState,
-                               final SnapshotsInProgress.Entry snapshot,
-                               final boolean partial,
-                               final List<String> indices,
-                               final Repository repository,
-                               final ActionListener<Snapshot> userCreateSnapshotListener) {
-        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
-
-            boolean hadAbortedInitializations;
-
-            @Override
-            protected void doRun() {
-                assert initializingSnapshots.contains(snapshot.snapshot());
-                if (repository.isReadOnly()) {
-                    throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository");
-                }
-                final String snapshotName = snapshot.snapshot().getSnapshotId().getName();
-                final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
-                repository.getRepositoryData(repositoryDataListener);
-                repositoryDataListener.whenComplete(repositoryData -> {
-                    // check if the snapshot name already exists in the repository
-                    if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
-                        throw new InvalidSnapshotNameException(
-                            repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
-                    }
-
-                    logger.info("snapshot [{}] started", snapshot.snapshot());
-                    final Version version =
-                        minCompatibleVersion(clusterState.nodes().getMinNodeVersion(), snapshot.repository(), repositoryData, null);
-                    if (indices.isEmpty()) {
-                        // No indices in this snapshot - we are done
-                        userCreateSnapshotListener.onResponse(snapshot.snapshot());
-                        endSnapshot(new SnapshotsInProgress.Entry(
-                            snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, version,
-                            null), clusterState.metadata());
-                        return;
-                    }
-                    clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
-
-                        @Override
-                        public ClusterState execute(ClusterState currentState) {
-                            SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
-                            List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
-                            for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
-                                if (entry.snapshot().equals(snapshot.snapshot()) == false) {
-                                    entries.add(entry);
-                                    continue;
-                                }
-
-                                if (entry.state() == State.ABORTED) {
-                                    entries.add(entry);
-                                    assert entry.shards().isEmpty();
-                                    hadAbortedInitializations = true;
-                                } else {
-                                    final List<IndexId> indexIds = repositoryData.resolveNewIndices(indices);
-                                    // Replace the snapshot that was just initialized
-                                    ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards =
-                                        shards(currentState, indexIds, useShardGenerations(version), repositoryData);
-                                    if (!partial) {
-                                        Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
-                                            currentState.metadata());
-                                        Set<String> missing = indicesWithMissingShards.v1();
-                                        Set<String> closed = indicesWithMissingShards.v2();
-                                        if (missing.isEmpty() == false || closed.isEmpty() == false) {
-                                            final StringBuilder failureMessage = new StringBuilder();
-                                            if (missing.isEmpty() == false) {
-                                                failureMessage.append("Indices don't have primary shards ");
-                                                failureMessage.append(missing);
-                                            }
-                                            if (closed.isEmpty() == false) {
-                                                if (failureMessage.length() > 0) {
-                                                    failureMessage.append("; ");
-                                                }
-                                                failureMessage.append("Indices are closed ");
-                                                failureMessage.append(closed);
-                                            }
-                                            entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, indexIds,
-                                                repositoryData.getGenId(), shards, version, failureMessage.toString()));
-                                            continue;
-                                        }
-                                    }
-                                    entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, indexIds, repositoryData.getGenId(),
-                                        shards, version, null));
-                                }
-                            }
-                            return ClusterState.builder(currentState)
-                                .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries)))
-                                .build();
-                        }
-
-                        @Override
-                        public void onFailure(String source, Exception e) {
-                            logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot",
-                                snapshot.snapshot().getSnapshotId()), e);
-                            removeSnapshotFromClusterState(snapshot.snapshot(), e,
-                                new CleanupAfterErrorListener(userCreateSnapshotListener, e));
-                        }
-
-                        @Override
-                        public void onNoLongerMaster(String source) {
-                            // We are not longer a master - we shouldn't try to do any cleanup
-                            // The new master will take care of it
-                            logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId());
-                            userCreateSnapshotListener.onFailure(
-                                new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization"));
-                        }
-
-                        @Override
-                        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                            // The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted
-                            // for processing. If client wants to wait for the snapshot completion, it can register snapshot
-                            // completion listener in this method. For the snapshot completion to work properly, the snapshot
-                            // should still exist when listener is registered.
-                            userCreateSnapshotListener.onResponse(snapshot.snapshot());
-
-                            if (hadAbortedInitializations) {
-                                final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
-                                assert snapshotsInProgress != null;
-                                final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot());
-                                assert entry != null;
-                                endSnapshot(entry, newState.metadata());
-                            }
-                        }
-                    });
-                }, this::onFailure);
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]",
-                    snapshot.snapshot().getSnapshotId()), e);
-                removeSnapshotFromClusterState(snapshot.snapshot(), e,
-                    new CleanupAfterErrorListener(userCreateSnapshotListener, e));
-            }
-        });
-    }
-
-    private static class CleanupAfterErrorListener {
-
-        private final ActionListener<Snapshot> userCreateSnapshotListener;
-        private final Exception e;
-
-        CleanupAfterErrorListener(ActionListener<Snapshot> userCreateSnapshotListener, Exception e) {
-            this.userCreateSnapshotListener = userCreateSnapshotListener;
-            this.e = e;
-        }
-
-        public void onFailure(@Nullable Exception e) {
-            userCreateSnapshotListener.onFailure(ExceptionsHelper.useOrSuppress(e, this.e));
-        }
-
-        public void onNoLongerMaster() {
-            userCreateSnapshotListener.onFailure(e);
-        }
-    }
-
     private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
         ShardGenerations.Builder builder = ShardGenerations.builder();
         final Map<String, IndexId> indexLookup = new HashMap<>();
@@ -569,12 +411,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     }
                     // Cleanup all snapshots that have no more work left:
                     // 1. Completed snapshots
-                    // 2. Snapshots in state INIT that the previous master failed to start
+                    // 2. Snapshots in state INIT that a previous master of an older version failed to start
                     // 3. Snapshots in any other state that have all their shard tasks completed
                     snapshotsInProgress.entries().stream().filter(
-                        entry -> entry.state().completed()
-                            || initializingSnapshots.contains(entry.snapshot()) == false
-                               && (entry.state() == State.INIT || completed(entry.shards().values()))
+                        entry -> entry.state().completed() || entry.state() == State.INIT || completed(entry.shards().values())
                     ).forEach(entry -> endSnapshot(entry, event.state().metadata()));
                 }
                 if (newMaster) {
@@ -668,7 +508,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                             }
                         }
                         entries.add(updatedSnapshot);
-                    } else if (snapshot.state() == State.INIT && initializingSnapshots.contains(snapshot.snapshot()) == false) {
+                    } else if (snapshot.state() == State.INIT) {
                         changed = true;
                         // A snapshot in INIT state hasn't yet written anything to the repository so we simply remove it
                         // from the cluster state  without any further cleanup
@@ -836,8 +676,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         }
         final Snapshot snapshot = entry.snapshot();
         if (entry.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) {
+            // BwC logic to handle master fail-over from an older version that still used unknown repo generation snapshot entries
             logger.debug("[{}] was aborted before starting", snapshot);
-            removeSnapshotFromClusterState(entry.snapshot(), new SnapshotException(snapshot, "Aborted on initialization"), null);
+            removeSnapshotFromClusterState(entry.snapshot(), new SnapshotException(snapshot, "Aborted on initialization"));
             return;
         }
         threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
@@ -900,7 +741,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                         new SnapshotException(snapshot, "Failed to update cluster state during snapshot finalization", e));
                 } else {
                     logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
-                    removeSnapshotFromClusterState(snapshot, e, null);
+                    removeSnapshotFromClusterState(snapshot, e);
                 }
             }
         });
@@ -930,10 +771,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * Removes record of running snapshot from cluster state and notifies the listener when this action is complete
      * @param snapshot   snapshot
      * @param failure    exception if snapshot failed
-     * @param listener   listener to notify when snapshot information is removed from the cluster state
      */
-    private void removeSnapshotFromClusterState(final Snapshot snapshot, Exception failure,
-                                                @Nullable CleanupAfterErrorListener listener) {
+    private void removeSnapshotFromClusterState(final Snapshot snapshot, Exception failure) {
         assert failure != null : "Failure must be supplied";
         clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
 
@@ -947,26 +786,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 logger.warn(() -> new ParameterizedMessage("[{}] failed to remove snapshot metadata", snapshot), e);
                 failSnapshotCompletionListeners(
                     snapshot, new SnapshotException(snapshot, "Failed to remove snapshot from cluster state", e));
-                if (listener != null) {
-                    listener.onFailure(e);
-                }
             }
 
             @Override
             public void onNoLongerMaster(String source) {
                 failSnapshotCompletionListeners(
                     snapshot, ExceptionsHelper.useOrSuppress(failure, new SnapshotException(snapshot, "no longer master")));
-                if (listener != null) {
-                    listener.onNoLongerMaster();
-                }
             }
 
             @Override
             public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                 failSnapshotCompletionListeners(snapshot, failure);
-                if (listener != null) {
-                    listener.onFailure(null);
-                }
             }
         });
     }
@@ -1037,10 +867,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     outstandingDeletes.add(runningSnapshot.getSnapshotId());
                 }
                 if (state == State.INIT) {
-                    // snapshot is still initializing, mark it as aborted
+                    // snapshot was created by an older version of ES and never moved past INIT, remove it from the cluster state
                     shards = snapshotEntry.shards();
                     assert shards.isEmpty();
-                    failure = "Snapshot was aborted during initialization";
+                    failure = null;
                     abortedDuringInit = true;
                 } else if (state == State.STARTED) {
                     // snapshot is started - mark every non completed shard as aborted
@@ -1079,12 +909,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     failure = snapshotEntry.failure();
                 }
                 return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
-                    new SnapshotsInProgress(snapshots.entries().stream().map(existing -> {
-                        if (existing.equals(snapshotEntry)) {
-                            return new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure);
-                        }
-                        return existing;
-                    }).collect(Collectors.toUnmodifiableList()))).build();
+                    new SnapshotsInProgress(snapshots.entries().stream()
+                        // remove init state snapshot we found from a previous master if there was one
+                        .filter(existing -> abortedDuringInit == false || existing.equals(snapshotEntry) == false)
+                        .map(existing -> {
+                            if (existing.equals(snapshotEntry)) {
+                                return new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure);
+                            }
+                            return existing;
+                        }).collect(Collectors.toUnmodifiableList()))).build();
             }
 
             @Override
@@ -1099,6 +932,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     deleteFromRepoTask.clusterStateProcessed(source, oldState, newState);
                     return;
                 }
+                if (abortedDuringInit) {
+                    // BwC Path where we removed an outdated INIT state snapshot from the cluster state
+                    logger.info("Successfully aborted snapshot [{}]", runningSnapshot);
+                    if (outstandingDeletes.isEmpty()) {
+                        listener.onResponse(null);
+                    } else {
+                        clusterService.submitStateUpdateTask("delete snapshot",
+                                createDeleteStateUpdate(outstandingDeletes, repositoryName, repositoryData.getGenId(),
+                                        Priority.IMMEDIATE, listener));
+                    }
+                    return;
+                }
                 logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
                 addListener(runningSnapshot, ActionListener.wrap(
                     result -> {
@@ -1108,18 +953,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                                         result.v1().getGenId(), Priority.IMMEDIATE, listener));
                     },
                     e -> {
-                        if (abortedDuringInit) {
-                            logger.info("Successfully aborted snapshot [{}]", runningSnapshot);
-                            if (outstandingDeletes.isEmpty()) {
-                                listener.onResponse(null);
-                            } else {
-                                clusterService.submitStateUpdateTask("delete snapshot",
-                                        createDeleteStateUpdate(outstandingDeletes, repositoryName, repositoryData.getGenId(),
-                                                Priority.IMMEDIATE, listener));
-                            }
-                        } else {
-                            if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class)
-                                != null) {
+                       if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
                                 logger.warn("master failover before deleted snapshot could complete", e);
                                 // Just pass the exception to the transport handler as is so it is retried on the new master
                                 listener.onFailure(e);
@@ -1129,7 +963,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                                     new SnapshotMissingException(runningSnapshot.getRepository(), runningSnapshot.getSnapshotId(), e));
                             }
                         }
-                    }
                 ));
             }
 
@@ -1270,17 +1103,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * repository and all nodes in the cluster.
      *
      * @param minNodeVersion minimum node version in the cluster
-     * @param repositoryName name of the repository to modify
      * @param repositoryData current {@link RepositoryData} of that repository
      * @param excluded       snapshot id to ignore when computing the minimum version
      *                       (used to use newer metadata version after a snapshot delete)
      * @return minimum node version that must still be able to read the repository metadata
      */
-    public Version minCompatibleVersion(Version minNodeVersion, String repositoryName, RepositoryData repositoryData,
-                                        @Nullable Collection<SnapshotId> excluded) {
+    public Version minCompatibleVersion(Version minNodeVersion, RepositoryData repositoryData, @Nullable Collection<SnapshotId> excluded) {
         Version minCompatVersion = minNodeVersion;
         final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
-        final Repository repository = repositoriesService.repository(repositoryName);
         for (SnapshotId snapshotId : snapshotIds.stream().filter(excluded == null ? sn -> true : Predicate.not(excluded::contains))
                 .collect(Collectors.toList())) {
             final Version known = repositoryData.getVersion(snapshotId);
@@ -1289,18 +1119,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 assert repositoryData.shardGenerations().totalShards() == 0 :
                     "Saw shard generations [" + repositoryData.shardGenerations() +
                         "] but did not have versions tracked for snapshot [" + snapshotId + "]";
-                try {
-                    final Version foundVersion = repository.getSnapshotInfo(snapshotId).version();
-                    if (useShardGenerations(foundVersion) == false) {
-                        // We don't really care about the exact version if its before 7.6 as the 7.5 metadata is the oldest we are able
-                        // to write out so we stop iterating here and just use 7.5.0 as a placeholder.
-                        return OLD_SNAPSHOT_FORMAT;
-                    }
-                    minCompatVersion = minCompatVersion.before(foundVersion) ? minCompatVersion : foundVersion;
-                } catch (SnapshotMissingException e) {
-                    logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e);
-                    return OLD_SNAPSHOT_FORMAT;
-                }
+                return OLD_SNAPSHOT_FORMAT;
             } else {
                 minCompatVersion = minCompatVersion.before(known) ? minCompatVersion : known;
             }
@@ -1333,7 +1152,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             Repository repository = repositoriesService.repository(repoName);
             repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshots(snapshotIds,
                 repositoryStateId,
-                minCompatibleVersion(minNodeVersion, repoName, repositoryData, snapshotIds),
+                minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds),
                 ActionListener.wrap(v -> {
                         logger.info("snapshots {} deleted", snapshotIds);
                         removeSnapshotDeletionFromClusterState(snapshotIds, null, l);

+ 8 - 13
server/src/main/java/org/elasticsearch/snapshots/package-info.java

@@ -35,19 +35,14 @@
  * <h2>Snapshot Creation</h2>
  * <p>Snapshots are created by the following sequence of events:</p>
  * <ol>
- * <li>An invocation of {@link org.elasticsearch.snapshots.SnapshotsService#createSnapshot} enqueues a cluster state update to create
- * a {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry} in the cluster state's {@code SnapshotsInProgress}. This initial snapshot
- * entry has its state set to {@code INIT} and an empty map set for the state of the individual shard's snapshots.</li>
- *
- * <li>After the snapshot's entry with state {@code INIT} is in the cluster state, {@link org.elasticsearch.snapshots.SnapshotsService}
- * determines the primary shards' assignments for all indices that are being snapshotted and updates the existing
- * {@code SnapshotsInProgress.Entry} with state {@code STARTED} and adds the map of {@link org.elasticsearch.index.shard.ShardId} to
- * {@link org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus} that tracks the assignment of which node is to snapshot which
- * shard. All shard snapshots are executed on the shard's primary node. Thus all shards for which the primary node was found to have a
- * healthy copy of the shard are marked as being in state {@code INIT} in this map. If the primary for a shard is unassigned, it is marked
- * as {@code MISSING} in this map. In case the primary is initializing at this point, it is marked as in state {@code WAITING}. In case a
- * shard's primary is relocated at any point after its {@code SnapshotsInProgress.Entry} has moved to state {@code STARTED} and thus been
- * assigned to a specific cluster node, that shard's snapshot will fail and move to state {@code FAILED}.</li>
+ * <li>First the {@link org.elasticsearch.snapshots.SnapshotsService} determines the primary shards' assignments for all indices that are
+ * being snapshotted and creates a {@code SnapshotsInProgress.Entry} with state {@code STARTED} and adds the map of
+ * {@link org.elasticsearch.index.shard.ShardId} to {@link org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus} that tracks
+ * the assignment of which node is to snapshot which shard. All shard snapshots are executed on the shard's primary node. Thus all shards
+ * for which the primary node was found to have a healthy copy of the shard are marked as being in state {@code INIT} in this map. If the
+ * primary for a shard is unassigned, it is marked as {@code MISSING} in this map. In case the primary is initializing at this point, it is
+ * marked as in state {@code WAITING}. In case a shard's primary is relocated at any point after its {@code SnapshotsInProgress.Entry} was
+ * created and thus been assigned to a specific cluster node, that shard's snapshot will fail and move to state {@code FAILED}.</li>
  *
  * <li>The new {@code SnapshotsInProgress.Entry} is then observed by
  * {@link org.elasticsearch.snapshots.SnapshotShardsService#clusterChanged} on all nodes and since the entry is in state {@code STARTED}

+ 0 - 78
server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java

@@ -74,84 +74,6 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
             .build();
     }
 
-    public void testDisruptionOnSnapshotInitialization() throws Exception {
-        final String idxName = "test";
-        final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
-        final String dataNode = internalCluster().startDataOnlyNode();
-        ensureStableCluster(4);
-
-        createRandomIndex(idxName);
-
-        logger.info("-->  creating repository");
-        assertAcked(client().admin().cluster().preparePutRepository("test-repo")
-            .setType("fs").setSettings(Settings.builder()
-                .put("location", randomRepoPath())
-                .put("compress", randomBoolean())
-                .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
-
-        // Writing incompatible snapshot can cause this test to fail due to a race condition in repo initialization
-        // by the current master and the former master. It is not causing any issues in real life scenario, but
-        // might make this test to fail. We are going to complete initialization of the snapshot to prevent this failures.
-        logger.info("-->  initializing the repository");
-        assertEquals(SnapshotState.SUCCESS, client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
-            .setWaitForCompletion(true).setIncludeGlobalState(true).setIndices().get().getSnapshotInfo().state());
-
-        final String masterNode1 = internalCluster().getMasterName();
-        Set<String> otherNodes = new HashSet<>();
-        otherNodes.addAll(allMasterEligibleNodes);
-        otherNodes.remove(masterNode1);
-        otherNodes.add(dataNode);
-
-        NetworkDisruption networkDisruption =
-            new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes),
-                new NetworkDisruption.NetworkUnresponsive());
-        internalCluster().setDisruptionScheme(networkDisruption);
-
-        ClusterService clusterService = internalCluster().clusterService(masterNode1);
-        CountDownLatch disruptionStarted = new CountDownLatch(1);
-        clusterService.addListener(new ClusterStateListener() {
-            @Override
-            public void clusterChanged(ClusterChangedEvent event) {
-                SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE);
-                if (snapshots != null && snapshots.entries().size() > 0) {
-                    if (snapshots.entries().get(0).state() == SnapshotsInProgress.State.INIT) {
-                        // The snapshot started, we can start disruption so the INIT state will arrive to another master node
-                        logger.info("--> starting disruption");
-                        networkDisruption.startDisrupting();
-                        clusterService.removeListener(this);
-                        disruptionStarted.countDown();
-                    }
-                }
-            }
-        });
-
-        logger.info("--> starting snapshot");
-        ActionFuture<CreateSnapshotResponse> future = client(masterNode1).admin().cluster()
-            .prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(false).setIndices(idxName).execute();
-
-        logger.info("--> waiting for disruption to start");
-        assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));
-
-        assertAllSnapshotsCompleted();
-
-        logger.info("--> verify that snapshot was successful or no longer exist");
-        assertBusy(() -> {
-            try {
-                assertSnapshotExists("test-repo", "test-snap-2");
-            } catch (SnapshotMissingException exception) {
-                logger.info("--> done verifying, snapshot doesn't exist");
-            }
-        }, 1, TimeUnit.MINUTES);
-
-        logger.info("--> stopping disrupting");
-        networkDisruption.stopDisrupting();
-        ensureStableCluster(4, masterNode1);
-        logger.info("--> done");
-
-        future.get();
-        assertAllSnapshotsCompleted();
-    }
-
     public void testDisruptionAfterFinalization() throws Exception {
         final String idxName = "test";
         final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);

+ 2 - 2
server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java

@@ -243,7 +243,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
         final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
         assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
             ActionRunnable.supply(f, () ->
-                snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repository), null)))),
+                snapshotsService.minCompatibleVersion(Version.CURRENT, getRepositoryData(repository), null)))),
             is(SnapshotsService.OLD_SNAPSHOT_FORMAT));
 
         logger.info("--> verify that snapshot with missing root level metadata can be deleted");
@@ -252,7 +252,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
         logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot");
         assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
             ActionRunnable.supply(f, () ->
-                snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repository), null)))),
+                snapshotsService.minCompatibleVersion(Version.CURRENT, getRepositoryData(repository), null)))),
             is(Version.CURRENT));
         final RepositoryData finalRepositoryData = getRepositoryData(repository);
         for (SnapshotId snapshotId : finalRepositoryData.getSnapshotIds()) {

+ 5 - 9
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -867,8 +867,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
         continueOrDie(createRepoAndIndex(repoName, index, shards),
             createIndexResponse -> client().admin().cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener));
 
-        final StepListener<CreateSnapshotResponse> snapshotStartedListener = new StepListener<>();
-
         continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> {
             final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0);
             final TestClusterNodes.TestClusterNode currentPrimaryNode = testClusterNodes.nodeById(shardToRelocate.currentNodeId());
@@ -887,7 +885,11 @@ public class SnapshotResiliencyTests extends ESTestCase {
                                 scheduleNow(() -> testClusterNodes.stopNode(masterNode));
                             }
                             testClusterNodes.randomDataNodeSafe().client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
-                                .execute(snapshotStartedListener);
+                                    .execute(ActionListener.wrap(() -> {
+                                        createdSnapshot.set(true);
+                                        testClusterNodes.randomDataNodeSafe().client.admin().cluster().deleteSnapshot(
+                                                new DeleteSnapshotRequest(repoName, snapshotName), noopListener());
+                                    }));
                             scheduleNow(
                                 () -> testClusterNodes.randomMasterNodeSafe().client.admin().cluster().reroute(
                                     new ClusterRerouteRequest().add(new AllocateEmptyPrimaryAllocationCommand(
@@ -900,12 +902,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
             });
         });
 
-        continueOrDie(snapshotStartedListener, snapshotResponse -> {
-            createdSnapshot.set(true);
-            testClusterNodes.randomDataNodeSafe().client.admin().cluster().deleteSnapshot(
-                new DeleteSnapshotRequest(repoName, snapshotName), noopListener());
-        });
-
         runUntil(() -> testClusterNodes.randomMasterNode().map(master -> {
             if (createdSnapshot.get() == false) {
                 return false;