瀏覽代碼

Remove and inline methods in SnapshotsService.deleteSnapshots() (#76079)

Tanguy Leroux 4 年之前
父節點
當前提交
ee66de9f3a
共有 1 個文件被更改,包括 75 次插入84 次删除
  1. 75 84
      server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

+ 75 - 84
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -302,7 +302,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     SnapshotDeletionsInProgress.TYPE,
                     SnapshotDeletionsInProgress.EMPTY
                 );
-                ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
+                ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
                 ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
                 // Store newSnapshot here to be processed in clusterStateProcessed
                 List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
@@ -461,7 +461,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             public ClusterState execute(ClusterState currentState) {
                 ensureRepositoryExists(repositoryName, currentState);
                 ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
-                ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
+                ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot");
                 final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
                 final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries();
                 ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
@@ -534,7 +534,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         }, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
     }
 
-    private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) {
+    private static void ensureNoCleanupInProgress(
+        final ClusterState currentState,
+        final String repositoryName,
+        final String snapshotName,
+        final String reason
+    ) {
         final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(
             RepositoryCleanupInProgress.TYPE,
             RepositoryCleanupInProgress.EMPTY
@@ -543,7 +548,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             throw new ConcurrentSnapshotExecutionException(
                 repositoryName,
                 snapshotName,
-                "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
+                "cannot "
+                    + reason
+                    + " while a repository cleanup is in-progress in "
+                    + repositoryCleanupInProgress.entries()
+                        .stream()
+                        .map(RepositoryCleanupInProgress.Entry::repository)
+                        .collect(Collectors.toSet())
             );
         }
     }
@@ -2021,18 +2032,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * @param listener        listener
      */
     public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener<Void> listener) {
-
+        final String repositoryName = request.repository();
         final String[] snapshotNames = request.snapshots();
-        final String repoName = request.repository();
         logger.info(
             () -> new ParameterizedMessage(
                 "deleting snapshots [{}] from repository [{}]",
                 Strings.arrayToCommaDelimitedString(snapshotNames),
-                repoName
+                repositoryName
             )
         );
 
-        final Repository repository = repositoriesService.repository(repoName);
+        final Repository repository = repositoriesService.repository(repositoryName);
         repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {
 
             private SnapshotDeletionsInProgress.Entry newDelete = null;
@@ -2049,19 +2059,46 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 
             @Override
             public ClusterState execute(ClusterState currentState) {
-                ensureRepositoryExists(repoName, currentState);
-                final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
-                final List<SnapshotsInProgress.Entry> snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName);
-                final List<SnapshotId> snapshotIds = matchingSnapshotIds(
-                    snapshotEntries.stream().map(e -> e.snapshot().getSnapshotId()).collect(Collectors.toList()),
-                    repositoryData,
-                    snapshotNames,
-                    repoName
-                );
+                ensureRepositoryExists(repositoryName, currentState);
+                final Set<SnapshotId> snapshotIds = new HashSet<>();
+
+                // find in-progress snapshots to delete in cluster state
+                final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
+                for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
+                    final SnapshotId snapshotId = entry.snapshot().getSnapshotId();
+                    if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, snapshotId.getName())) {
+                        snapshotIds.add(snapshotId);
+                    }
+                }
+
+                // find snapshots to delete in repository data
+                final Map<String, SnapshotId> snapshotsIdsInRepository = repositoryData.getSnapshotIds()
+                    .stream()
+                    .collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
+                for (String snapshotOrPattern : snapshotNames) {
+                    if (Regex.isSimpleMatchPattern(snapshotOrPattern)) {
+                        for (Map.Entry<String, SnapshotId> entry : snapshotsIdsInRepository.entrySet()) {
+                            if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
+                                snapshotIds.add(entry.getValue());
+                            }
+                        }
+                    } else {
+                        final SnapshotId foundId = snapshotsIdsInRepository.get(snapshotOrPattern);
+                        if (foundId == null) {
+                            if (snapshotIds.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) {
+                                throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
+                            }
+                        } else {
+                            snapshotIds.add(foundId);
+                        }
+                    }
+                }
+
                 if (snapshotIds.isEmpty()) {
                     return currentState;
                 }
-                final Set<SnapshotId> activeCloneSources = snapshots.entries()
+
+                final Set<SnapshotId> activeCloneSources = snapshotsInProgress.entries()
                     .stream()
                     .filter(SnapshotsInProgress.Entry::isClone)
                     .map(SnapshotsInProgress.Entry::source)
@@ -2069,41 +2106,40 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 for (SnapshotId snapshotId : snapshotIds) {
                     if (activeCloneSources.contains(snapshotId)) {
                         throw new ConcurrentSnapshotExecutionException(
-                            new Snapshot(repoName, snapshotId),
+                            new Snapshot(repositoryName, snapshotId),
                             "cannot delete snapshot while it is being cloned"
                         );
                     }
                 }
+
+                ensureNoCleanupInProgress(
+                    currentState,
+                    repositoryName,
+                    snapshotIds.stream().findFirst().get().getName(),
+                    "delete snapshot"
+                );
+
                 final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(
                     SnapshotDeletionsInProgress.TYPE,
                     SnapshotDeletionsInProgress.EMPTY
                 );
-                final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(
-                    RepositoryCleanupInProgress.TYPE,
-                    RepositoryCleanupInProgress.EMPTY
-                );
-                if (repositoryCleanupInProgress.hasCleanupInProgress()) {
-                    throw new ConcurrentSnapshotExecutionException(
-                        new Snapshot(repoName, snapshotIds.get(0)),
-                        "cannot delete snapshots while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
-                    );
-                }
+
                 final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
                 // don't allow snapshot deletions while a restore is taking place,
                 // otherwise we could end up deleting a snapshot that is being restored
                 // and the files the restore depends on would all be gone
 
                 for (RestoreInProgress.Entry entry : restoreInProgress) {
-                    if (repoName.equals(entry.snapshot().getRepository()) && snapshotIds.contains(entry.snapshot().getSnapshotId())) {
+                    if (repositoryName.equals(entry.snapshot().getRepository()) && snapshotIds.contains(entry.snapshot().getSnapshotId())) {
                         throw new ConcurrentSnapshotExecutionException(
-                            new Snapshot(repoName, snapshotIds.get(0)),
+                            new Snapshot(repositoryName, snapshotIds.stream().findFirst().get()),
                             "cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]"
                         );
                     }
                 }
                 // Snapshot ids that will have to be physically deleted from the repository
                 final Set<SnapshotId> snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds);
-                final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream().map(existing -> {
+                final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress.of(snapshotsInProgress.entries().stream().map(existing -> {
                     if (existing.state() == State.STARTED && snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) {
                         // snapshot is started - mark every non completed shard as aborted
                         final SnapshotsInProgress.Entry abortedEntry = existing.abort();
@@ -2130,14 +2166,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 // add the snapshot deletion to the cluster state
                 final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries()
                     .stream()
-                    .filter(entry -> entry.repository().equals(repoName) && entry.state() == SnapshotDeletionsInProgress.State.WAITING)
+                    .filter(entry -> entry.repository().equals(repositoryName))
+                    .filter(entry -> entry.state() == SnapshotDeletionsInProgress.State.WAITING)
                     .findFirst()
                     .orElse(null);
                 if (replacedEntry == null) {
                     final Optional<SnapshotDeletionsInProgress.Entry> foundDuplicate = deletionsInProgress.getEntries()
                         .stream()
                         .filter(
-                            entry -> entry.repository().equals(repoName)
+                            entry -> entry.repository().equals(repositoryName)
                                 && entry.state() == SnapshotDeletionsInProgress.State.STARTED
                                 && entry.getSnapshots().containsAll(snapshotIds)
                         )
@@ -2149,14 +2186,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     }
                     newDelete = new SnapshotDeletionsInProgress.Entry(
                         List.copyOf(snapshotIdsRequiringCleanup),
-                        repoName,
+                        repositoryName,
                         threadPool.absoluteTimeInMillis(),
                         repositoryData.getGenId(),
                         updatedSnapshots.entries()
                             .stream()
-                            .filter(entry -> repoName.equals(entry.repository()))
+                            .filter(entry -> repositoryName.equals(entry.repository()))
                             .noneMatch(SnapshotsService::isWritingToRepository)
-                            && deletionsInProgress.hasExecutingDeletion(repoName) == false
+                            && deletionsInProgress.hasExecutingDeletion(repositoryName) == false
                                 ? SnapshotDeletionsInProgress.State.STARTED
                                 : SnapshotDeletionsInProgress.State.WAITING
                     );
@@ -2193,7 +2230,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                         return;
                     }
                     if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) {
-                        if (tryEnterRepoLoop(repoName)) {
+                        if (tryEnterRepoLoop(repositoryName)) {
                             deleteSnapshotsFromRepository(newDelete, repositoryData, newState.nodes().getMinNodeVersion());
                         } else {
                             logger.trace("Delete [{}] could not execute directly and was queued", newDelete);
@@ -2208,52 +2245,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         }, "delete snapshot [" + repository + "]" + Arrays.toString(snapshotNames), listener::onFailure);
     }
 
-    private static List<SnapshotId> matchingSnapshotIds(
-        List<SnapshotId> inProgress,
-        RepositoryData repositoryData,
-        String[] snapshotsOrPatterns,
-        String repositoryName
-    ) {
-        final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds()
-            .stream()
-            .collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
-        final Set<SnapshotId> foundSnapshots = new HashSet<>(inProgress);
-        for (String snapshotOrPattern : snapshotsOrPatterns) {
-            if (Regex.isSimpleMatchPattern(snapshotOrPattern)) {
-                for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
-                    if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
-                        foundSnapshots.add(entry.getValue());
-                    }
-                }
-            } else {
-                final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern);
-                if (foundId == null) {
-                    if (inProgress.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) {
-                        throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
-                    }
-                } else {
-                    foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern));
-                }
-            }
-        }
-        return List.copyOf(foundSnapshots);
-    }
-
-    // Return in-progress snapshot entries by name and repository in the given cluster state or null if none is found
-    private static List<SnapshotsInProgress.Entry> findInProgressSnapshots(
-        SnapshotsInProgress snapshots,
-        String[] snapshotNames,
-        String repositoryName
-    ) {
-        List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
-        for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
-            if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, entry.snapshot().getSnapshotId().getName())) {
-                entries.add(entry);
-            }
-        }
-        return entries;
-    }
-
     /**
      * Checks if the given {@link SnapshotsInProgress.Entry} is currently writing to the repository.
      *