Browse Source

Tighten up threading in snapshot finalization (#124403)

Today snapshot finalization does nontrivial work on the calling thread
(often the cluster applier thread) and also in theory may fork back to
the cluster applier thread in `getRepositoryData`, yet it always forks
at least one task (the `SnapshotInfo` write) to the `SNAPSHOT` pool
anyway. With this change we fork to the `SNAPSHOT` pool straight away
and then make sure to stay on this pool throughout.
David Turner 7 tháng trước cách đây
mục cha
commit
f1f2df77ba

+ 9 - 17
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -78,7 +78,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.ThrottledIterator;
 import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
@@ -1738,6 +1737,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     @Override
     public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
+        assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
         final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
         final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations();
         final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
@@ -1767,14 +1767,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         SubscribableListener
 
             // Get the current RepositoryData
-            .<RepositoryData>newForked(
-                listener -> getRepositoryData(
-                    // TODO we might already be on a SNAPSHOT thread, make it so that we're always on a SNAPSHOT thread here and then we
-                    // can avoid a little more forking below
-                    EsExecutors.DIRECT_EXECUTOR_SERVICE,
-                    listener
-                )
-            )
+            .<RepositoryData>newForked(listener -> getRepositoryData(executor, listener))
 
             // Identify and write the missing metadata
             .<MetadataWriteResult>andThen((l, existingRepositoryData) -> {
@@ -1842,13 +1835,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                         }));
                     }
 
-                    // Write the SnapshotInfo blob
-                    executor.execute(
-                        ActionRunnable.run(
-                            allMetaListeners.acquire(),
-                            () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
-                        )
-                    );
+                    // Write the SnapshotInfo blob to the repo (we're already on a SNAPSHOT thread so no need to fork this)
+                    assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
+                    ActionListener.completeWith(allMetaListeners.acquire(), () -> {
+                        SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress);
+                        return null;
+                    });
 
                     // TODO fail fast if any metadata write fails
                     // TODO clean up successful metadata writes on failure (needs care, we must not clobber another node concurrently
@@ -1858,7 +1850,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
             // Update the root blob
             .<RootBlobUpdateResult>andThen((l, metadataWriteResult) -> {
-                // unlikely, but in theory we could still be on the thread which called finalizeSnapshot - TODO must fork to SNAPSHOT here
+                assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
                 final var snapshotDetails = SnapshotDetails.fromSnapshotInfo(snapshotInfo);
                 final var existingRepositoryData = metadataWriteResult.existingRepositoryData();
                 writeIndexGen(

+ 49 - 6
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -71,7 +71,9 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.FixForMultiProject;
 import org.elasticsearch.core.Nullable;
@@ -1398,9 +1400,34 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
     }
 
     private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) {
-        assert currentlyFinalizing.contains(snapshot.getRepository());
-        assert repositoryOperations.assertNotQueued(snapshot);
-        try {
+        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new SnapshotFinalization(snapshot, metadata, repositoryData));
+    }
+
+    /**
+     * Implements the finalization process for a snapshot: does some preparatory calculations, builds a {@link SnapshotInfo} and a
+     * {@link FinalizeSnapshotContext}, calls {@link Repository#finalizeSnapshot} and handles the outcome by notifying waiting listeners
+     * and triggering the next snapshot-related activity (another finalization, a batch of deletes, etc.)
+     */
+    // This only really makes sense to run against a BlobStoreRepository, and the division of work between this class and
+    // BlobStoreRepository#finalizeSnapshot is kind of awkward and artificial; TODO consolidate all this stuff into one place and simplify
+    private class SnapshotFinalization extends AbstractRunnable {
+
+        private final Snapshot snapshot;
+        private final Metadata metadata;
+        private final RepositoryData repositoryData;
+
+        SnapshotFinalization(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) {
+            this.snapshot = snapshot;
+            this.metadata = metadata;
+            this.repositoryData = repositoryData;
+        }
+
+        @Override
+        protected void doRun() {
+            assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
+            assert currentlyFinalizing.contains(snapshot.getRepository());
+            assert repositoryOperations.assertNotQueued(snapshot);
+
             SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
             final String failure = entry.failure();
             logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
@@ -1428,7 +1455,9 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
             final ListenableFuture<Metadata> metadataListener = new ListenableFuture<>();
             final Repository repo = repositoriesService.repository(snapshot.getRepository());
             if (entry.isClone()) {
-                threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> {
+                // This listener is kinda unnecessary since we now always complete it synchronously. It's only here to catch exceptions.
+                // TODO simplify this.
+                ActionListener.completeWith(metadataListener, () -> {
                     final Metadata existing = repo.getSnapshotGlobalMetadata(entry.source());
                     final Metadata.Builder metaBuilder = Metadata.builder(existing);
                     final Set<Index> existingIndices = new HashSet<>();
@@ -1450,11 +1479,12 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
                     );
                     metaBuilder.dataStreams(dataStreamsToCopy, dataStreamAliasesToCopy);
                     return metaBuilder.build();
-                }));
+                });
             } else {
                 metadataListener.onResponse(metadata);
             }
             metadataListener.addListener(ActionListener.wrap(meta -> {
+                assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
                 final Metadata metaForSnapshot = metadataForSnapshot(entry, meta);
 
                 final Map<String, SnapshotInfo.IndexSnapshotDetails> indexSnapshotDetails = Maps.newMapWithExpectedSize(
@@ -1554,7 +1584,20 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
                     shardGenerations
                 )
             ));
-        } catch (Exception e) {
+        }
+
+        @Override
+        public void onRejection(Exception e) {
+            if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) {
+                logger.debug("failing finalization of {} due to shutdown", snapshot);
+                handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
+            } else {
+                onFailure(e);
+            }
+        }
+
+        @Override
+        public void onFailure(Exception e) {
             logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e);
             assert false : new AssertionError("unexpected failure finalizing " + snapshot, e);
             handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);