Bläddra i källkod

Run deleting old index gens after snapshot in the background (#98062)

Bit of a strange way to deal with the test failure but I think this is a
valid optimisation in isolation. I think it's fine to have that one
tasks forked and executing without waiting for it to the snapshot pool. 

There's no need to run the old index gens cleanup before completing a
snapshot operation (and starting the next operation). This also makes it
so the snapshot listener is resolved on the master thread, so we can get
a consistent view of the cluster state when calling
clusterService.state() in the callback. This fixes test failure #97572
which resulted from accessing the state on the `SNAPSHOT` pool after a
master failover.

Fixes #97572
Armin Braun 2 år sedan
förälder
incheckning
1622df073c

+ 8 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

@@ -74,6 +74,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -913,6 +914,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         assertDocCount("test-idx", 100L);
     }
 
+    private int numberOfFiles(Path dir) throws Exception {
+        awaitMasterFinishRepoOperations(); // wait for potential background blob deletes to complete on master
+        final AtomicInteger count = new AtomicInteger();
+        forEachFileRecursively(dir, ((path, basicFileAttributes) -> count.incrementAndGet()));
+        return count.get();
+    }
+
     public void testDeleteRepositoryWhileSnapshotting() throws Exception {
         disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
         Client client = client();

+ 12 - 10
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -2377,23 +2377,25 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
                     logger.trace("[{}] successfully set safe repository generation to [{}]", metadata.name(), newGen);
                     cacheRepositoryData(newRepositoryData, version);
-                    threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(delegate, () -> {
-                        // Delete all now outdated index files up to 1000 blobs back from the new generation.
-                        // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
-                        // Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep
-                        // two index-N blobs around.
-                        try {
+                    threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
+                        @Override
+                        public void onFailure(Exception e) {
+                            logger.warn(() -> "Failed to clean up old index blobs from before [" + newGen + "]", e);
+                        }
+
+                        @Override
+                        protected void doRun() throws Exception {
+                            // Delete all now outdated index files up to 1000 blobs back from the new generation.
+                            // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
                             deleteFromContainer(
                                 blobContainer(),
                                 LongStream.range(Math.max(Math.max(expectedGen - 1, 0), newGen - 1000), newGen)
                                     .mapToObj(gen -> INDEX_FILE_PREFIX + gen)
                                     .iterator()
                             );
-                        } catch (IOException e) {
-                            logger.warn(() -> "Failed to clean up old index blobs from before [" + newGen + "]", e);
                         }
-                        return newRepositoryData;
-                    }));
+                    });
+                    delegate.onResponse(newRepositoryData);
                 }
             });
         }));

+ 0 - 7
test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -76,7 +76,6 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -196,12 +195,6 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
         assertEquals("Unexpected file count, found: [" + found + "].", expectedCount, found.size());
     }
 
-    public static int numberOfFiles(Path dir) throws IOException {
-        final AtomicInteger count = new AtomicInteger();
-        forEachFileRecursively(dir, ((path, basicFileAttributes) -> count.incrementAndGet()));
-        return count.get();
-    }
-
     protected void stopNode(final String node) throws IOException {
         logger.info("--> stopping node {}", node);
         internalCluster().stopNode(node);