Ver código fonte

Introduce SNAPSHOT_META Threadpool for Fetching Repository Metadata (#73172)

Adds new snapshot meta pool that is used to speed up the get snapshots API
by making `SnapshotInfo` load in parallel. Also use this pool to load
`RepositoryData`.
A follow-up to this would expand the use of this pool to the snapshot status
API and make it run in parallel as well.
Armin Braun 4 anos atrás
pai
commit
da242856fd

+ 5 - 0
docs/reference/modules/threadpool.asciidoc

@@ -41,6 +41,11 @@ There are several thread pools, but the important ones include:
     keep-alive of `5m` and a max of `min(5, (`<<node.processors,
     `# of allocated processors`>>`) / 2)`.
 
+`snapshot_meta`::
+    For snapshot repository metadata read operations. Thread pool type is `scaling` with a
+    keep-alive of `5m` and a max of `min(50, (`<<node.processors,
+    `# of allocated processors`>>` pass:[ * ]3))`.
+
 `warmer`::
     For segment warm-up operations. Thread pool type is `scaling` with a
     keep-alive of `5m` and a max of `min(5, (`<<node.processors,

+ 86 - 20
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java

@@ -14,7 +14,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.util.CollectionUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.repositories.get.TransportGetRepositoriesAction;
 import org.elasticsearch.action.support.ActionFilters;
@@ -34,6 +33,7 @@ import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
+import org.elasticsearch.repositories.RepositoryMissingException;
 import org.elasticsearch.snapshots.SnapshotException;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
@@ -46,12 +46,15 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.unmodifiableList;
@@ -211,8 +214,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
         }
 
         if (verbose) {
-            threadPool.generic().execute(ActionRunnable.supply(
-                    listener, () -> snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable, task)));
+            snapshots(snapshotsInProgress, repo, toResolve, ignoreUnavailable, task, listener);
         } else {
             final List<SnapshotInfo> snapshotInfos;
             if (repositoryData != null) {
@@ -235,12 +237,16 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
      * @param snapshotIds         snapshots for which to fetch snapshot information
      * @param ignoreUnavailable   if true, snapshots that could not be read will only be logged with a warning,
      *                            if false, they will throw an error
-     * @return list of snapshots
      */
-    private List<SnapshotInfo> snapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName,
-                                        List<SnapshotId> snapshotIds, boolean ignoreUnavailable, CancellableTask task) {
+    private void snapshots(SnapshotsInProgress snapshotsInProgress,
+                           String repositoryName,
+                           Collection<SnapshotId> snapshotIds,
+                           boolean ignoreUnavailable,
+                           CancellableTask task,
+                           ActionListener<List<SnapshotInfo>> listener) {
         if (task.isCancelled()) {
-            throw new TaskCancelledException("task cancelled");
+            listener.onFailure(new TaskCancelledException("task cancelled"));
+            return;
         }
         final Set<SnapshotInfo> snapshotSet = new HashSet<>();
         final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
@@ -252,28 +258,88 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
                 snapshotSet.add(new SnapshotInfo(entry));
             }
         }
-        // then, look in the repository
-        final Repository repository = repositoriesService.repository(repositoryName);
-        for (SnapshotId snapshotId : snapshotIdsToIterate) {
+        // then, look in the repository if there's any matching snapshots left
+        final List<SnapshotInfo> snapshotInfos;
+        if (snapshotIdsToIterate.isEmpty()) {
+            snapshotInfos = Collections.emptyList();
+        } else {
+            snapshotInfos = Collections.synchronizedList(new ArrayList<>());
+        }
+        final ActionListener<Collection<Void>> allDoneListener = listener.delegateFailure((l, v) -> {
+            final ArrayList<SnapshotInfo> snapshotList = new ArrayList<>(snapshotInfos);
+            snapshotList.addAll(snapshotSet);
+            CollectionUtil.timSort(snapshotList);
+            listener.onResponse(unmodifiableList(snapshotList));
+        });
+        if (snapshotIdsToIterate.isEmpty()) {
+            allDoneListener.onResponse(Collections.emptyList());
+            return;
+        }
+        // put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the
+        // snapshot meta pool for a single request
+        final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), snapshotIdsToIterate.size());
+        final BlockingQueue<SnapshotId> queue = new LinkedBlockingQueue<>(snapshotIdsToIterate);
+        final ActionListener<Void> workerDoneListener = new GroupedActionListener<>(allDoneListener, workers).delegateResponse((l, e) -> {
+            queue.clear(); // Stop fetching the remaining snapshots once we've failed fetching one since the response is an error response
+                           // anyway in this case
+            l.onFailure(e);
+        });
+        final Repository repository;
+        try {
+            repository = repositoriesService.repository(repositoryName);
+        } catch (RepositoryMissingException e) {
+            listener.onFailure(e);
+            return;
+        }
+        for (int i = 0; i < workers; i++) {
+            getOneSnapshotInfo(
+                    ignoreUnavailable,
+                    repository,
+                    queue,
+                    snapshotInfos,
+                    task,
+                    workerDoneListener
+            );
+        }
+    }
+
+    /**
+     * Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. If it finds one in the queue,
+     * loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collection, then invokes itself again to
+     * try and poll another task from the queue.
+     * If the queue is empty resolves {@code} listener.
+     */
+    private void getOneSnapshotInfo(boolean ignoreUnavailable,
+                                    Repository repository,
+                                    BlockingQueue<SnapshotId> queue,
+                                    Collection<SnapshotInfo> snapshotInfos,
+                                    CancellableTask task,
+                                    ActionListener<Void> listener) {
+        final SnapshotId snapshotId = queue.poll();
+        if (snapshotId == null) {
+            listener.onResponse(null);
+            return;
+        }
+        threadPool.executor(ThreadPool.Names.SNAPSHOT_META).execute(() -> {
             if (task.isCancelled()) {
-                throw new TaskCancelledException("task cancelled");
+                listener.onFailure(new TaskCancelledException("task cancelled"));
+                return;
             }
             try {
-                snapshotSet.add(repository.getSnapshotInfo(snapshotId));
+                snapshotInfos.add(repository.getSnapshotInfo(snapshotId));
             } catch (Exception ex) {
                 if (ignoreUnavailable) {
                     logger.warn(() -> new ParameterizedMessage("failed to get snapshot [{}]", snapshotId), ex);
                 } else {
-                    if (ex instanceof SnapshotException) {
-                        throw ex;
-                    }
-                    throw new SnapshotException(repositoryName, snapshotId, "Snapshot could not be read", ex);
+                    listener.onFailure(
+                            ex instanceof SnapshotException
+                                    ? ex
+                                    : new SnapshotException(repository.getMetadata().name(), snapshotId, "Snapshot could not be read", ex)
+                    );
                 }
             }
-        }
-        final ArrayList<SnapshotInfo> snapshotList = new ArrayList<>(snapshotSet);
-        CollectionUtil.timSort(snapshotList);
-        return unmodifiableList(snapshotList);
+            getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, listener);
+        });
     }
 
     private boolean isAllSnapshots(String[] snapshots) {

+ 4 - 2
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -1315,6 +1315,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     protected void assertSnapshotOrGenericThread() {
         assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']')
+            || Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT_META + ']')
             || Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') :
             "Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";
     }
@@ -1428,11 +1429,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             // Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state
             // Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given
             // generation may change
+            final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META);
             if (bestEffortConsistency || cacheRepositoryData == false) {
-                threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
+                executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
             } else {
                 repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) ->
-                        threadPool.generic().execute(ActionRunnable.wrap(l, this::doGetRepositoryData)));
+                        executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData)));
             }
         }
     }

+ 4 - 0
server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -69,6 +69,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
         public static final String REFRESH = "refresh";
         public static final String WARMER = "warmer";
         public static final String SNAPSHOT = "snapshot";
+        public static final String SNAPSHOT_META = "snapshot_meta";
         public static final String FORCE_MERGE = "force_merge";
         public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
         public static final String FETCH_SHARD_STORE = "fetch_shard_store";
@@ -116,6 +117,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
         entry(Names.REFRESH, ThreadPoolType.SCALING),
         entry(Names.WARMER, ThreadPoolType.SCALING),
         entry(Names.SNAPSHOT, ThreadPoolType.SCALING),
+        entry(Names.SNAPSHOT_META, ThreadPoolType.SCALING),
         entry(Names.FORCE_MERGE, ThreadPoolType.FIXED),
         entry(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING),
         entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING),
@@ -189,6 +191,8 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
         builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
         builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
         builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
+        builders.put(Names.SNAPSHOT_META, new ScalingExecutorBuilder(Names.SNAPSHOT_META, 1, Math.min(allocatedProcessors * 3, 50),
+                TimeValue.timeValueSeconds(30L)));
         builders.put(Names.FETCH_SHARD_STARTED,
                 new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
         builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false));

+ 3 - 1
server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java

@@ -62,7 +62,8 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
             keepAlive = randomIntBetween(1, 300);
             builder.put("thread_pool." + threadPoolName + ".keep_alive", keepAlive + "s");
         } else {
-            keepAlive = "generic".equals(threadPoolName) ? 30 : 300; // the defaults
+            keepAlive = "generic".equals(threadPoolName) || ThreadPool.Names.SNAPSHOT_META.equals(threadPoolName)
+                    ? 30 : 300; // the defaults
         }
 
         runScalingThreadPoolTest(builder.build(), (clusterSettings, threadPool) -> {
@@ -96,6 +97,7 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
         sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen);
         sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfAllocatedProcessorsMaxFive);
         sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfAllocatedProcessorsMaxFive);
+        sizes.put(ThreadPool.Names.SNAPSHOT_META, n -> Math.min(n * 3, 50));
         sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors);
         sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors);
         return sizes.get(threadPoolName).apply(numberOfProcessors);