Browse Source

Fork post-snapshot-delete cleanup off master thread (#122731)

We shouldn't run the post-snapshot-delete cleanup work on the master
thread, since it can be quite expensive and need not block subsequent
cluster state updates. This commit forks it onto a `SNAPSHOT` thread.
David Turner 8 months ago
parent
commit
cd15d09adf

+ 5 - 0
docs/changelog/122731.yaml

@@ -0,0 +1,5 @@
+pr: 122731
+summary: Fork post-snapshot-delete cleanup off master thread
+area: Snapshot/Restore
+type: bug
+issues: []

+ 7 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.delete/10_basic.yml

@@ -68,3 +68,10 @@ setup:
         wait_for_completion: false
         wait_for_completion: false
 
 
   - match: { acknowledged: true }
   - match: { acknowledged: true }
+
+  # now create another snapshot just to ensure that the async delete finishes before the test cleanup runs:
+  - do:
+      snapshot.create:
+        repository: test_repo_create_1
+        snapshot: barrier_snapshot
+        wait_for_completion: true

+ 6 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java

@@ -508,6 +508,12 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
             .orElseThrow()
             .orElseThrow()
             .queue();
             .queue();
 
 
+        // There is one task in the queue for computing and forking the cleanup work.
+        assertThat(queueLength.getAsInt(), equalTo(1));
+
+        safeAwait(barrier); // unblock the barrier thread and let it process the queue
+        safeAwait(barrier); // wait for the queue to be processed
+
         // There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the
         // There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the
         // throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows
         // throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows
         // we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this
         // we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this

+ 2 - 0
server/src/main/java/org/elasticsearch/repositories/RepositoryData.java

@@ -29,6 +29,7 @@ import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.snapshots.SnapshotState;
 import org.elasticsearch.snapshots.SnapshotState;
 import org.elasticsearch.snapshots.SnapshotsService;
 import org.elasticsearch.snapshots.SnapshotsService;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentParser;
 
 
@@ -377,6 +378,7 @@ public final class RepositoryData {
      * @return map of index to index metadata blob id to delete
      * @return map of index to index metadata blob id to delete
      */
      */
     public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapshots(Collection<SnapshotId> snapshotIds) {
     public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapshots(Collection<SnapshotId> snapshotIds) {
+        assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
         Iterator<IndexId> indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds);
         Iterator<IndexId> indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds);
         final Set<String> allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet()
         final Set<String> allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet()
             .stream()
             .stream()

+ 13 - 7
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -1134,14 +1134,20 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     );
                     );
                 })
                 })
 
 
-                .<RepositoryData>andThen((l, newRepositoryData) -> {
-                    l.onResponse(newRepositoryData);
-                    // Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot deletion
-                    try (var refs = new RefCountingRunnable(onCompletion)) {
-                        cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
-                        cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
+                .<RepositoryData>andThen(
+                    // writeIndexGen finishes on master-service thread so must fork here.
+                    snapshotExecutor,
+                    threadPool.getThreadContext(),
+                    (l, newRepositoryData) -> {
+                        l.onResponse(newRepositoryData);
+                        // Once we have updated the repository, run the unreferenced blobs cleanup in parallel to shard-level snapshot
+                        // deletion
+                        try (var refs = new RefCountingRunnable(onCompletion)) {
+                            cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
+                            cleanupUnlinkedShardLevelBlobs(refs.acquireListener());
+                        }
                     }
                     }
-                })
+                )
 
 
                 .addListener(repositoryDataUpdateListener);
                 .addListener(repositoryDataUpdateListener);
         }
         }