Browse Source

Allow Bulk Snapshot Deletes to Abort (#56009)

Making use of #55773 to simplify snapshot state machine.
1. Deletes with no in-progress snapshot now add the delete entry to the cluster state right away
instead of doing a second CS update after the fist update was a NOOP.
2. If a bulk delete matches in-progress as well as completed snapshots, abort the in-progress snapshot
and then move on to delete from the repository.
Armin Braun 5 years ago
parent
commit
84522d2a67

+ 52 - 52
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -984,11 +984,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
     }
     }
 
 
     /**
     /**
-     * Deletes snapshots from the repository or aborts a running snapshot.
-     * If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from
-     * the repository.
-     * If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the
-     * given names in the repository and deletes them.
+     * Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them.
      *
      *
      * @param request         delete snapshot request
      * @param request         delete snapshot request
      * @param listener        listener
      * @param listener        listener
@@ -1000,39 +996,46 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         logger.info(() -> new ParameterizedMessage("deleting snapshots [{}] from repository [{}]",
         logger.info(() -> new ParameterizedMessage("deleting snapshots [{}] from repository [{}]",
                 Strings.arrayToCommaDelimitedString(snapshotNames), repositoryName));
                 Strings.arrayToCommaDelimitedString(snapshotNames), repositoryName));
 
 
-        clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) {
+        final Repository repository = repositoriesService.repository(repositoryName);
+        repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.NORMAL) {
+
+            private Snapshot runningSnapshot;
 
 
-            Snapshot runningSnapshot;
+            private ClusterStateUpdateTask deleteFromRepoTask;
 
 
-            boolean abortedDuringInit = false;
+            private boolean abortedDuringInit = false;
+
+            private List<SnapshotId> outstandingDeletes;
 
 
             @Override
             @Override
-            public ClusterState execute(ClusterState currentState) {
+            public ClusterState execute(ClusterState currentState) throws Exception {
                 if (snapshotNames.length > 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) {
                 if (snapshotNames.length > 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) {
                     throw new IllegalArgumentException("Deleting multiple snapshots in a single request is only supported in version [ "
                     throw new IllegalArgumentException("Deleting multiple snapshots in a single request is only supported in version [ "
                             + MULTI_DELETE_VERSION + "] but cluster contained node of version [" + currentState.nodes().getMinNodeVersion()
                             + MULTI_DELETE_VERSION + "] but cluster contained node of version [" + currentState.nodes().getMinNodeVersion()
                             + "]");
                             + "]");
                 }
                 }
                 final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
                 final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
-                final SnapshotsInProgress.Entry snapshotEntry;
-                if (snapshotNames.length == 1) {
-                    final String snapshotName = snapshotNames[0];
-                    if (Regex.isSimpleMatchPattern(snapshotName)) {
-                        snapshotEntry = null;
-                    } else {
-                        snapshotEntry = findInProgressSnapshot(snapshots, snapshotName, repositoryName);
-                    }
-                } else {
-                    snapshotEntry = null;
-                }
+                final SnapshotsInProgress.Entry snapshotEntry = findInProgressSnapshot(snapshots, snapshotNames, repositoryName);
+                final List<SnapshotId> snapshotIds = matchingSnapshotIds(
+                        snapshotEntry == null ? null : snapshotEntry.snapshot().getSnapshotId(),
+                        repositoryData, snapshotNames, repositoryName);
                 if (snapshotEntry == null) {
                 if (snapshotEntry == null) {
-                    return currentState;
+                    deleteFromRepoTask =
+                            createDeleteStateUpdate(snapshotIds, repositoryName, repositoryData.getGenId(), Priority.NORMAL, listener);
+                    return deleteFromRepoTask.execute(currentState);
                 }
                 }
+
                 runningSnapshot = snapshotEntry.snapshot();
                 runningSnapshot = snapshotEntry.snapshot();
                 final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
                 final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
 
 
                 final State state = snapshotEntry.state();
                 final State state = snapshotEntry.state();
                 final String failure;
                 final String failure;
+
+                outstandingDeletes = new ArrayList<>(snapshotIds);
+                if (state != State.INIT) {
+                    // INIT state snapshots won't ever be physically written to the repository but all other states will end up in the repo
+                    outstandingDeletes.add(runningSnapshot.getSnapshotId());
+                }
                 if (state == State.INIT) {
                 if (state == State.INIT) {
                     // snapshot is still initializing, mark it as aborted
                     // snapshot is still initializing, mark it as aborted
                     shards = snapshotEntry.shards();
                     shards = snapshotEntry.shards();
@@ -1091,15 +1094,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 
 
             @Override
             @Override
             public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
             public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                if (runningSnapshot == null) {
-                    try {
-                        repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData ->
-                                createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName,
-                                        repositoryData.getGenId(), request.masterNodeTimeout(), Priority.NORMAL, listener),
-                                        "delete completed snapshots", listener::onFailure);
-                    } catch (RepositoryMissingException e) {
-                        listener.onFailure(e);
-                    }
+                if (deleteFromRepoTask != null) {
+                    assert outstandingDeletes == null : "Shouldn't have outstanding deletes after already starting delete task";
+                    deleteFromRepoTask.clusterStateProcessed(source, oldState, newState);
                     return;
                     return;
                 }
                 }
                 logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
                 logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
@@ -1107,13 +1104,19 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     result -> {
                     result -> {
                         logger.debug("deleted snapshot completed - deleting files");
                         logger.debug("deleted snapshot completed - deleting files");
                         clusterService.submitStateUpdateTask("delete snapshot",
                         clusterService.submitStateUpdateTask("delete snapshot",
-                                createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName,
-                                        result.v1().getGenId(), null, Priority.IMMEDIATE, listener));
+                                createDeleteStateUpdate(outstandingDeletes, repositoryName,
+                                        result.v1().getGenId(), Priority.IMMEDIATE, listener));
                     },
                     },
                     e -> {
                     e -> {
                         if (abortedDuringInit) {
                         if (abortedDuringInit) {
                             logger.info("Successfully aborted snapshot [{}]", runningSnapshot);
                             logger.info("Successfully aborted snapshot [{}]", runningSnapshot);
-                            listener.onResponse(null);
+                            if (outstandingDeletes.isEmpty()) {
+                                listener.onResponse(null);
+                            } else {
+                                clusterService.submitStateUpdateTask("delete snapshot",
+                                        createDeleteStateUpdate(outstandingDeletes, repositoryName, repositoryData.getGenId(),
+                                                Priority.IMMEDIATE, listener));
+                            }
                         } else {
                         } else {
                             if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class)
                             if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class)
                                 != null) {
                                 != null) {
@@ -1134,28 +1137,30 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             public TimeValue timeout() {
             public TimeValue timeout() {
                 return request.masterNodeTimeout();
                 return request.masterNodeTimeout();
             }
             }
-        });
+        }, "delete snapshot", listener::onFailure);
     }
     }
 
 
-    private static List<SnapshotId> matchingSnapshotIds(RepositoryData repositoryData, String[] snapshotsOrPatterns,
-                                                        String repositoryName) {
+    private static List<SnapshotId> matchingSnapshotIds(@Nullable SnapshotId inProgress, RepositoryData repositoryData,
+                                                        String[] snapshotsOrPatterns, String repositoryName) {
         final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds().stream().collect(
         final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds().stream().collect(
                 Collectors.toMap(SnapshotId::getName, Function.identity()));
                 Collectors.toMap(SnapshotId::getName, Function.identity()));
         final Set<SnapshotId> foundSnapshots = new HashSet<>();
         final Set<SnapshotId> foundSnapshots = new HashSet<>();
         for (String snapshotOrPattern : snapshotsOrPatterns) {
         for (String snapshotOrPattern : snapshotsOrPatterns) {
-            if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
-                final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern);
-                if (foundId == null) {
-                    throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
-                } else {
-                    foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern));
-                }
-            } else {
+            if (Regex.isSimpleMatchPattern(snapshotOrPattern)) {
                 for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
                 for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
                     if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
                     if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
                         foundSnapshots.add(entry.getValue());
                         foundSnapshots.add(entry.getValue());
                     }
                     }
                 }
                 }
+            } else {
+                final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern);
+                if (foundId == null) {
+                    if (inProgress == null || inProgress.getName().equals(snapshotOrPattern) == false) {
+                        throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
+                    }
+                } else {
+                    foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern));
+                }
             }
             }
         }
         }
         return List.copyOf(foundSnapshots);
         return List.copyOf(foundSnapshots);
@@ -1163,7 +1168,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 
 
     // Return in-progress snapshot entry by name and repository in the given cluster state or null if none is found
     // Return in-progress snapshot entry by name and repository in the given cluster state or null if none is found
     @Nullable
     @Nullable
-    private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String snapshotName,
+    private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String[] snapshotNames,
                                                                     String repositoryName) {
                                                                     String repositoryName) {
         if (snapshots == null) {
         if (snapshots == null) {
             return null;
             return null;
@@ -1171,7 +1176,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         SnapshotsInProgress.Entry snapshotEntry = null;
         SnapshotsInProgress.Entry snapshotEntry = null;
         for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
         for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
             if (entry.repository().equals(repositoryName)
             if (entry.repository().equals(repositoryName)
-                && entry.snapshot().getSnapshotId().getName().equals(snapshotName)) {
+                && Regex.simpleMatch(snapshotNames, entry.snapshot().getSnapshotId().getName())) {
                 snapshotEntry = entry;
                 snapshotEntry = entry;
                 break;
                 break;
             }
             }
@@ -1180,7 +1185,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
     }
     }
 
 
     private ClusterStateUpdateTask createDeleteStateUpdate(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId,
     private ClusterStateUpdateTask createDeleteStateUpdate(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId,
-                                                           @Nullable TimeValue timeout, Priority priority, ActionListener<Void> listener) {
+                                                           Priority priority, ActionListener<Void> listener) {
         // Short circuit to noop state update if there isn't anything to delete
         // Short circuit to noop state update if there isn't anything to delete
         if (snapshotIds.isEmpty()) {
         if (snapshotIds.isEmpty()) {
             return new ClusterStateUpdateTask() {
             return new ClusterStateUpdateTask() {
@@ -1198,11 +1203,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                 public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                     listener.onResponse(null);
                     listener.onResponse(null);
                 }
                 }
-
-                @Override
-                public TimeValue timeout() {
-                    return timeout;
-                }
             };
             };
         }
         }
         return new ClusterStateUpdateTask(priority) {
         return new ClusterStateUpdateTask(priority) {

+ 47 - 0
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -576,6 +576,53 @@ public class SnapshotResiliencyTests extends ESTestCase {
         }
         }
     }
     }
 
 
+    public void testBulkSnapshotDeleteWithAbort() {
+        setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
+
+        String repoName = "repo";
+        String snapshotName = "snapshot";
+        final String index = "test";
+        final int shards = randomIntBetween(1, 10);
+
+        TestClusterNodes.TestClusterNode masterNode =
+                testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
+
+        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+
+        continueOrDie(createRepoAndIndex(repoName, index, shards),
+                createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
+                        .setWaitForCompletion(true).execute(createSnapshotResponseStepListener));
+
+        final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();
+
+        continueOrDie(createSnapshotResponseStepListener,
+                createSnapshotResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2")
+                        .execute(createOtherSnapshotResponseStepListener));
+
+        final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
+
+        continueOrDie(createOtherSnapshotResponseStepListener,
+                createSnapshotResponse -> client().admin().cluster().deleteSnapshot(
+                        new DeleteSnapshotRequest(repoName, "*"), deleteSnapshotStepListener));
+
+        deterministicTaskQueue.runAllRunnableTasks();
+
+        SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
+        assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
+        final Repository repository = masterNode.repositoriesService.repository(repoName);
+        Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
+        // No snapshots should be left in the repository
+        assertThat(snapshotIds, empty());
+
+        for (SnapshotId snapshotId : snapshotIds) {
+            final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
+            assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
+            assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
+            assertEquals(shards, snapshotInfo.successfulShards());
+            assertEquals(0, snapshotInfo.failedShards());
+        }
+    }
+
     public void testConcurrentSnapshotRestoreAndDeleteOther() {
     public void testConcurrentSnapshotRestoreAndDeleteOther() {
         setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
         setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));