Browse Source

Add threadpool for critical operations on system indices (#72625)

* Add new thread pool for critical operations
* Split critical thread pool into read and write
* Add POJO to hold thread pool names
* Add tests for critical thread pools
* Add thread pools to data streams
* Update settings for security plugin
* Retrieve ExecutorSelector from SystemIndices where possible
* Use a singleton ExecutorSelector
William Brafford 4 years ago
parent
commit
1c295a92d8
33 changed files with 592 additions and 137 deletions
  1. 10 0
      docs/reference/modules/threadpool.asciidoc
  2. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/indices/TestSystemIndexDescriptor.java
  3. 3 10
      server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
  4. 5 2
      server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java
  5. 5 2
      server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java
  6. 2 10
      server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
  7. 21 10
      server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
  8. 10 6
      server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
  9. 2 1
      server/src/main/java/org/elasticsearch/common/io/DiskIoBufferPool.java
  10. 15 11
      server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java
  11. 86 0
      server/src/main/java/org/elasticsearch/indices/ExecutorNames.java
  12. 104 0
      server/src/main/java/org/elasticsearch/indices/ExecutorSelector.java
  13. 15 1
      server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java
  14. 42 4
      server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java
  15. 7 0
      server/src/main/java/org/elasticsearch/indices/SystemIndices.java
  16. 7 3
      server/src/main/java/org/elasticsearch/node/Node.java
  17. 7 2
      server/src/main/java/org/elasticsearch/search/SearchService.java
  18. 2 1
      server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java
  19. 9 1
      server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
  20. 1 1
      server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java
  21. 2 1
      server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java
  22. 10 7
      server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
  23. 3 2
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java
  24. 30 27
      server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java
  25. 149 0
      server/src/test/java/org/elasticsearch/indices/ExecutorSelectorTests.java
  26. 8 4
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  27. 1 1
      server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java
  28. 4 3
      test/framework/src/main/java/org/elasticsearch/node/MockNode.java
  29. 4 2
      test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java
  30. 18 22
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java
  31. 3 1
      x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java
  32. 3 1
      x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java
  33. 3 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

+ 10 - 0
docs/reference/modules/threadpool.asciidoc

@@ -96,6 +96,16 @@ There are several thread pools, but the important ones include:
     Thread pool type is `fixed` and a default maximum size of
     `min(5, (`<<node.processors, `# of allocated processors`>>`) / 2)`.
 
+`system_critical_read`::
+    For critical read operations on system indices.
+    Thread pool type is `fixed` and a default maximum size of
+    `min(5, (`<<node.processors, `# of allocated processors`>>`) / 2)`.
+
+`system_critical_write`::
+    For critical write operations on system indices.
+    Thread pool type is `fixed` and a default maximum size of
+    `min(5, (`<<node.processors, `# of allocated processors`>>`) / 2)`.
+
 Changing a specific thread pool can be done by setting its type-specific
 parameters; for example, changing the number of threads in the `write` thread
 pool:

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/indices/TestSystemIndexDescriptor.java

@@ -40,7 +40,7 @@ public class TestSystemIndexDescriptor extends SystemIndexDescriptor {
 
     TestSystemIndexDescriptor() {
         super(INDEX_NAME + "*", PRIMARY_INDEX_NAME, "Test system index", getOldMappings(), SETTINGS, INDEX_NAME, 0, "version", "stack",
-            Version.CURRENT.minimumCompatibilityVersion(), Type.INTERNAL_MANAGED, List.of(), List.of());
+            Version.CURRENT.minimumCompatibilityVersion(), Type.INTERNAL_MANAGED, List.of(), List.of(), null);
     }
 
     @Override

+ 3 - 10
server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -56,11 +56,11 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.threadpool.ThreadPool.Names;
 import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportService;
 
@@ -68,7 +68,6 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.LongSupplier;
 
 /** Performs shard-level bulk (index, delete or update) operations */
@@ -78,13 +77,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
     public static final ActionType<BulkShardResponse> TYPE = new ActionType<>(ACTION_NAME, BulkShardResponse::new);
 
     private static final Logger logger = LogManager.getLogger(TransportShardBulkAction.class);
-    private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
-        if (shard.indexSettings().getIndexMetadata().isSystem()) {
-            return Names.SYSTEM_WRITE;
-        } else {
-            return Names.WRITE;
-        }
-    };
 
     private final UpdateHelper updateHelper;
     private final MappingUpdatedAction mappingUpdatedAction;
@@ -95,7 +87,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
                                     MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
                                     IndexingPressure indexingPressure, SystemIndices systemIndices) {
         super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
-            BulkShardRequest::new, BulkShardRequest::new, EXECUTOR_NAME_FUNCTION, false, indexingPressure, systemIndices);
+            BulkShardRequest::new, BulkShardRequest::new, ExecutorSelector::getWriteExecutorForShard, false,
+            indexingPressure, systemIndices);
         this.updateHelper = updateHelper;
         this.mappingUpdatedAction = mappingUpdatedAction;
     }

+ 5 - 2
server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

@@ -22,6 +22,7 @@ import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.get.GetResult;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -34,14 +35,16 @@ import java.io.IOException;
 public class TransportGetAction extends TransportSingleShardAction<GetRequest, GetResponse> {
 
     private final IndicesService indicesService;
+    private final ExecutorSelector executorSelector;
 
     @Inject
     public TransportGetAction(ClusterService clusterService, TransportService transportService,
                               IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters,
-                              IndexNameExpressionResolver indexNameExpressionResolver) {
+                              IndexNameExpressionResolver indexNameExpressionResolver, ExecutorSelector executorSelector) {
         super(GetAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
                 GetRequest::new, ThreadPool.Names.GET);
         this.indicesService = indicesService;
+        this.executorSelector = executorSelector;
     }
 
     @Override
@@ -106,7 +109,7 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
     protected String getExecutor(GetRequest request, ShardId shardId) {
         final ClusterState clusterState = clusterService.state();
         if (clusterState.metadata().getIndexSafe(shardId.getIndex()).isSystem()) {
-            return ThreadPool.Names.SYSTEM_READ;
+            return executorSelector.executorForGet(shardId.getIndexName());
         } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) {
             return ThreadPool.Names.SEARCH_THROTTLED;
         } else {

+ 5 - 2
server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

@@ -24,6 +24,7 @@ import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.get.GetResult;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -36,14 +37,16 @@ public class TransportShardMultiGetAction extends TransportSingleShardAction<Mul
     public static final ActionType<MultiGetShardResponse> TYPE = new ActionType<>(ACTION_NAME, MultiGetShardResponse::new);
 
     private final IndicesService indicesService;
+    private final ExecutorSelector executorSelector;
 
     @Inject
     public TransportShardMultiGetAction(ClusterService clusterService, TransportService transportService,
                                         IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters,
-                                        IndexNameExpressionResolver indexNameExpressionResolver) {
+                                        IndexNameExpressionResolver indexNameExpressionResolver, ExecutorSelector executorSelector) {
         super(ACTION_NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
                 MultiGetShardRequest::new, ThreadPool.Names.GET);
         this.indicesService = indicesService;
+        this.executorSelector = executorSelector;
     }
 
     @Override
@@ -119,7 +122,7 @@ public class TransportShardMultiGetAction extends TransportSingleShardAction<Mul
     protected String getExecutor(MultiGetShardRequest request, ShardId shardId) {
         final ClusterState clusterState = clusterService.state();
         if (clusterState.metadata().index(shardId.getIndex()).isSystem()) {
-            return ThreadPool.Names.SYSTEM_READ;
+            return executorSelector.executorForGet(shardId.getIndexName());
         } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) {
             return ThreadPool.Names.SEARCH_THROTTLED;
         } else {

+ 2 - 10
server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

@@ -27,30 +27,22 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
 import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.threadpool.ThreadPool.Names;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
-import java.util.function.Function;
 import java.util.stream.Stream;
 
 public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
     ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
 
     private static final String ACTION_NAME = "internal:index/seq_no/resync";
-    private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
-        if (shard.indexSettings().getIndexMetadata().isSystem()) {
-            return Names.SYSTEM_WRITE;
-        } else {
-            return Names.WRITE;
-        }
-    };
 
     @Inject
     public TransportResyncReplicationAction(Settings settings, TransportService transportService,
@@ -58,7 +50,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
                                             ShardStateAction shardStateAction, ActionFilters actionFilters,
                                             IndexingPressure indexingPressure, SystemIndices systemIndices) {
         super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
-            ResyncReplicationRequest::new, ResyncReplicationRequest::new, EXECUTOR_NAME_FUNCTION,
+            ResyncReplicationRequest::new, ResyncReplicationRequest::new, ExecutorSelector::getWriteExecutorForShard,
             true, /* we should never reject resync because of thread pool capacity on primary */
             indexingPressure, systemIndices);
     }

+ 21 - 10
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -20,7 +20,6 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
-import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -43,6 +42,7 @@ import org.elasticsearch.common.util.concurrent.CountDown;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.query.Rewriteable;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchService;
@@ -87,6 +87,8 @@ import java.util.stream.StreamSupport;
 import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
 import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
 import static org.elasticsearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort;
+import static org.elasticsearch.threadpool.ThreadPool.Names.SYSTEM_CRITICAL_READ;
+import static org.elasticsearch.threadpool.ThreadPool.Names.SYSTEM_READ;
 
 public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
 
@@ -103,6 +105,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
     private final IndexNameExpressionResolver indexNameExpressionResolver;
     private final NamedWriteableRegistry namedWriteableRegistry;
     private final CircuitBreaker circuitBreaker;
+    private final ExecutorSelector executorSelector;
 
     @Inject
     public TransportSearchAction(ThreadPool threadPool,
@@ -114,7 +117,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                                  ClusterService clusterService,
                                  ActionFilters actionFilters,
                                  IndexNameExpressionResolver indexNameExpressionResolver,
-                                 NamedWriteableRegistry namedWriteableRegistry) {
+                                 NamedWriteableRegistry namedWriteableRegistry,
+                                 ExecutorSelector executorSelector) {
         super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
         this.threadPool = threadPool;
         this.circuitBreaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST);
@@ -126,6 +130,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         this.searchService = searchService;
         this.indexNameExpressionResolver = indexNameExpressionResolver;
         this.namedWriteableRegistry = namedWriteableRegistry;
+        this.executorSelector = executorSelector;
     }
 
     private Map<String, AliasFilter> buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState,
@@ -658,7 +663,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         final DiscoveryNodes nodes = clusterState.nodes();
         BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
             nodes::get, remoteConnections, searchTransportService::getConnection);
-        final Executor asyncSearchExecutor = asyncSearchExecutor(concreteLocalIndices, clusterState);
+        final Executor asyncSearchExecutor = asyncSearchExecutor(concreteLocalIndices);
         final boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteLocalIndices,
             localShardIterators.size() + remoteShardIterators.size());
         searchAsyncActionProvider.asyncSearchAction(
@@ -667,13 +672,19 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             preFilterSearchShards, threadPool, clusters).start();
     }
 
-    Executor asyncSearchExecutor(final String[] indices, final ClusterState clusterState) {
-        final boolean onlySystemIndices = Arrays.stream(indices)
-            .allMatch(index -> {
-                final IndexMetadata indexMetadata = clusterState.metadata().index(index);
-                return indexMetadata != null && indexMetadata.isSystem();
-            });
-        return onlySystemIndices ? threadPool.executor(ThreadPool.Names.SYSTEM_READ) : threadPool.executor(ThreadPool.Names.SEARCH);
+    Executor asyncSearchExecutor(final String[] indices) {
+        final List<String> executorsForIndices = Arrays.stream(indices)
+            .map(executorSelector::executorForSearch)
+            .collect(Collectors.toList());
+        if (executorsForIndices.size() == 1) { // all indices have same executor
+            return threadPool.executor(executorsForIndices.get(0));
+        }
+        if (executorsForIndices.size() == 2
+            && executorsForIndices.contains(SYSTEM_READ)
+            && executorsForIndices.contains(SYSTEM_CRITICAL_READ)) { // mix of critical and non critical system indices
+            return threadPool.executor(SYSTEM_READ);
+        }
+        return threadPool.executor(ThreadPool.Names.SEARCH);
     }
 
     static BiFunction<String, String, Transport.Connection> buildConnectionLookup(String requestClusterAlias,

+ 10 - 6
server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

@@ -32,6 +32,7 @@ import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.Translog.Location;
+import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -40,7 +41,7 @@ import org.elasticsearch.transport.TransportService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
+import java.util.function.BiFunction;
 
 /**
  * Base class for transport actions that modify data in some shard like index, delete, and shardBulk.
@@ -54,13 +55,15 @@ public abstract class TransportWriteAction<
 
     protected final IndexingPressure indexingPressure;
     protected final SystemIndices systemIndices;
+    protected final ExecutorSelector executorSelector;
 
-    private final Function<IndexShard, String> executorFunction;
+    private final BiFunction<ExecutorSelector, IndexShard, String> executorFunction;
 
     protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
                                    ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
                                    ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
-                                   Writeable.Reader<ReplicaRequest> replicaRequest, Function<IndexShard, String> executorFunction,
+                                   Writeable.Reader<ReplicaRequest> replicaRequest,
+                                   BiFunction<ExecutorSelector, IndexShard, String> executorFunction,
                                    boolean forceExecutionOnPrimary, IndexingPressure indexingPressure, SystemIndices systemIndices) {
         // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
         // ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class.
@@ -69,10 +72,11 @@ public abstract class TransportWriteAction<
         this.executorFunction = executorFunction;
         this.indexingPressure = indexingPressure;
         this.systemIndices = systemIndices;
+        this.executorSelector = systemIndices.getExecutorSelector();
     }
 
     protected String executor(IndexShard shard) {
-        return executorFunction.apply(shard);
+        return executorFunction.apply(executorSelector, shard);
     }
 
     @Override
@@ -171,7 +175,7 @@ public abstract class TransportWriteAction<
     @Override
     protected void shardOperationOnPrimary(
             Request request, IndexShard primary, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener) {
-        threadPool.executor(executorFunction.apply(primary)).execute(new ActionRunnable<>(listener) {
+        threadPool.executor(executorFunction.apply(executorSelector, primary)).execute(new ActionRunnable<>(listener) {
             @Override
             protected void doRun() {
                 dispatchedShardOperationOnPrimary(request, primary, listener);
@@ -196,7 +200,7 @@ public abstract class TransportWriteAction<
      */
     @Override
     protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
-        threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable<>(listener) {
+        threadPool.executor(executorFunction.apply(executorSelector, replica)).execute(new ActionRunnable<>(listener) {
             @Override
             protected void doRun() {
                 dispatchedShardOperationOnReplica(request, replica, listener);

+ 2 - 1
server/src/main/java/org/elasticsearch/common/io/DiskIoBufferPool.java

@@ -39,7 +39,8 @@ public class DiskIoBufferPool {
         for (String s : Arrays.asList(
             "[" + ThreadPool.Names.WRITE + "]",
             "[" + ThreadPool.Names.FLUSH + "]",
-            "[" + ThreadPool.Names.SYSTEM_WRITE + "]")) {
+            "[" + ThreadPool.Names.SYSTEM_WRITE + "]",
+            "[" + ThreadPool.Names.SYSTEM_CRITICAL_WRITE + "]")) {
             if (threadName.contains(s)) {
                 return true;
             }

+ 15 - 11
server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

@@ -73,17 +73,21 @@ public class RetentionLeaseSyncAction extends
         final IndexingPressure indexingPressure,
         final SystemIndices systemIndices) {
         super(
-                settings,
-                ACTION_NAME,
-                transportService,
-                clusterService,
-                indicesService,
-                threadPool,
-                shardStateAction,
-                actionFilters,
-                RetentionLeaseSyncAction.Request::new,
-                RetentionLeaseSyncAction.Request::new,
-                ignore -> ThreadPool.Names.MANAGEMENT, false, indexingPressure, systemIndices);
+            settings,
+            ACTION_NAME,
+            transportService,
+            clusterService,
+            indicesService,
+            threadPool,
+            shardStateAction,
+            actionFilters,
+            RetentionLeaseSyncAction.Request::new,
+            RetentionLeaseSyncAction.Request::new,
+            (service, ignore) -> ThreadPool.Names.MANAGEMENT,
+            false,
+            indexingPressure,
+            systemIndices
+        );
     }
 
     @Override

+ 86 - 0
server/src/main/java/org/elasticsearch/indices/ExecutorNames.java

@@ -0,0 +1,86 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.indices;
+
+import org.elasticsearch.threadpool.ThreadPool;
+
+/**
+ * A class that gathers the names of thread pool executors that should be used for a particular system index or system data stream. This
+ * object is used both by the {@link SystemIndexDescriptor} and the {@link SystemDataStreamDescriptor} classes.
+ */
+public class ExecutorNames {
+    private final String getPoolName;
+    private final String searchPoolName;
+    private final String writePoolName;
+
+    /**
+     * The thread pools for a typical system index.
+     */
+    public static ExecutorNames DEFAULT_SYSTEM_INDEX_THREAD_POOLS = new ExecutorNames(
+        ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE
+    );
+
+    /**
+     * The thread pools for a typical system data stream. These are also the usual thread pools for non-system indices and data streams.
+     */
+    public static ExecutorNames DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS = new ExecutorNames(
+        ThreadPool.Names.GET, ThreadPool.Names.SEARCH, ThreadPool.Names.WRITE
+    );
+
+    /**
+     * The thread pools that should be used for critical system index operations.
+     */
+    public static ExecutorNames CRITICAL_SYSTEM_INDEX_THREAD_POOLS = new ExecutorNames(
+        ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_CRITICAL_WRITE
+    );
+
+    /**
+     * Create a new collection of thread pool names for a system descriptor to use.
+     *
+     * @param getPoolName    Name of the thread pool that get operations should use.
+     * @param searchPoolName Name of the thread pool that search operations should use. (In same cases, this is the same as the name of the
+     *                       pool for get operations.)
+     * @param writePoolName  Name of the thread pool that write operations should use.
+     */
+    public ExecutorNames(String getPoolName, String searchPoolName, String writePoolName) {
+        if (ThreadPool.THREAD_POOL_TYPES.containsKey(getPoolName) == false) {
+            throw new IllegalArgumentException(getPoolName + " is not a valid thread pool");
+        }
+        if (ThreadPool.THREAD_POOL_TYPES.containsKey(searchPoolName) == false) {
+            throw new IllegalArgumentException(searchPoolName + " is not a valid thread pool");
+        }
+        if (ThreadPool.THREAD_POOL_TYPES.containsKey(writePoolName) == false) {
+            throw new IllegalArgumentException(writePoolName + " is not a valid thread pool");
+        }
+        this.getPoolName = getPoolName;
+        this.searchPoolName = searchPoolName;
+        this.writePoolName = writePoolName;
+    }
+
+    /**
+     * @return Name of the thread pool that get operations should use
+     */
+    public String threadPoolForGet() {
+        return getPoolName;
+    }
+
+    /**
+     * @return Name of the thread pool that search operations should use
+     */
+    public String threadPoolForSearch() {
+        return searchPoolName;
+    }
+
+    /**
+     * @return Name of the thread pool that write operations should use
+     */
+    public String threadPoolForWrite() {
+        return writePoolName;
+    }
+}

+ 104 - 0
server/src/main/java/org/elasticsearch/indices/ExecutorSelector.java

@@ -0,0 +1,104 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.indices;
+
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.Objects;
+
+/**
+ * Some operations need to use different executors for different index patterns.
+ * Specifically, some operations on system indices are considered critical and
+ * should use the "system_critical_read" or "system_critical_write" thread pools
+ * rather than the "system_read" or "system_write" thread pools.
+ */
+public class ExecutorSelector {
+
+    private final SystemIndices systemIndices;
+
+    /**
+     * Package-private constructor; in general it's best to get an ExecutorSelector
+     * from {@link SystemIndices#getExecutorSelector()}.
+     * @param systemIndices A system indices object that this ExecutorSelector will
+     *                      use to match system index names to system index descriptors.
+     */
+    ExecutorSelector(SystemIndices systemIndices) {
+        this.systemIndices = systemIndices;
+    }
+
+    /**
+     * The "get" executor should be used when retrieving documents by ID.
+     * @param indexName Name of the index
+     * @return Name of the executor to use for a get operation.
+     */
+    public String executorForGet(String indexName) {
+        SystemIndexDescriptor indexDescriptor = systemIndices.findMatchingDescriptor(indexName);
+        if (Objects.nonNull(indexDescriptor)) {
+            return indexDescriptor.getThreadPoolNames().threadPoolForGet();
+        }
+
+        SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.findMatchingDataStreamDescriptor(indexName);
+        if (Objects.nonNull(dataStreamDescriptor)) {
+            return dataStreamDescriptor.getThreadPoolNames().threadPoolForGet();
+        }
+
+        return ThreadPool.Names.GET;
+    }
+
+    /**
+     * The "search" executor should be used for search or aggregation operations.
+     * @param indexName Name of the index
+     * @return Name of the executor to use for a search operation.
+     */
+    public String executorForSearch(String indexName) {
+        SystemIndexDescriptor indexDescriptor = systemIndices.findMatchingDescriptor(indexName);
+        if (Objects.nonNull(indexDescriptor)) {
+            return indexDescriptor.getThreadPoolNames().threadPoolForSearch();
+        }
+
+        SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.findMatchingDataStreamDescriptor(indexName);
+        if (Objects.nonNull(dataStreamDescriptor)) {
+            return dataStreamDescriptor.getThreadPoolNames().threadPoolForSearch();
+        }
+
+        return ThreadPool.Names.SEARCH;
+    }
+
+    /**
+     * The "write" executor should be used for operations that write new documents or
+     * update existing ones.
+     * @param indexName Name of the index
+     * @return Name of the executor to use for a search operation.
+     */
+    public String executorForWrite(String indexName) {
+        SystemIndexDescriptor indexDescriptor = systemIndices.findMatchingDescriptor(indexName);
+        if (Objects.nonNull(indexDescriptor)) {
+            return indexDescriptor.getThreadPoolNames().threadPoolForWrite();
+        }
+
+        SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.findMatchingDataStreamDescriptor(indexName);
+        if (Objects.nonNull(dataStreamDescriptor)) {
+            return dataStreamDescriptor.getThreadPoolNames().threadPoolForWrite();
+        }
+
+        return ThreadPool.Names.WRITE;
+    }
+
+    /**
+     * This is a convenience method for the case when we need to find an executor for a shard.
+     * Note that it can be passed to methods as a {@link java.util.function.BiFunction}.
+     * @param executorSelector An executor selector service.
+     * @param shard A shard for which we need to find an executor.
+     * @return Name of the executor that should be used for write operations on this shard.
+     */
+    public static String getWriteExecutorForShard(ExecutorSelector executorSelector, IndexShard shard) {
+        return executorSelector.executorForWrite(shard.shardId().getIndexName());
+    }
+}

+ 15 - 1
server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java

@@ -28,6 +28,7 @@ public class SystemDataStreamDescriptor {
     private final ComposableIndexTemplate composableIndexTemplate;
     private final Map<String, ComponentTemplate> componentTemplates;
     private final List<String> allowedElasticProductOrigins;
+    private final ExecutorNames executorNames;
 
     /**
      * Creates a new descriptor for a system data descriptor
@@ -40,10 +41,12 @@ public class SystemDataStreamDescriptor {
      *                           {@link ComposableIndexTemplate}
      * @param allowedElasticProductOrigins a list of product origin values that are allowed to access this data stream if the
      *                                     type is {@link Type#EXTERNAL}. Must not be {@code null}
+     * @param executorNames thread pools that should be used for operations on the system data stream
      */
     public SystemDataStreamDescriptor(String dataStreamName, String description, Type type,
                                       ComposableIndexTemplate composableIndexTemplate, Map<String, ComponentTemplate> componentTemplates,
-                                      List<String> allowedElasticProductOrigins) {
+                                      List<String> allowedElasticProductOrigins,
+                                      ExecutorNames executorNames) {
         this.dataStreamName = Objects.requireNonNull(dataStreamName, "dataStreamName must be specified");
         this.description = Objects.requireNonNull(description, "description must be specified");
         this.type = Objects.requireNonNull(type, "type must be specified");
@@ -54,6 +57,9 @@ public class SystemDataStreamDescriptor {
         if (type == Type.EXTERNAL && allowedElasticProductOrigins.isEmpty()) {
             throw new IllegalArgumentException("External system data stream without allowed products is not a valid combination");
         }
+        this.executorNames = Objects.nonNull(executorNames)
+            ? executorNames
+            : ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS;
     }
 
     public String getDataStreamName() {
@@ -84,6 +90,14 @@ public class SystemDataStreamDescriptor {
         return componentTemplates;
     }
 
+    /**
+     * Get the names of the thread pools that should be used for operations on this data stream.
+     * @return Names for get, search, and write executors.
+     */
+    public ExecutorNames getThreadPoolNames() {
+        return this.executorNames;
+    }
+
     public enum Type {
         INTERNAL,
         EXTERNAL

+ 42 - 4
server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java

@@ -21,6 +21,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -95,6 +96,11 @@ public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<Sy
      */
     private final List<SystemIndexDescriptor> priorSystemIndexDescriptors;
 
+    /**
+     * The thread pools that actions will use to operate on this descriptor's system indices
+     */
+    private final ExecutorNames executorNames;
+
     /**
      * Creates a descriptor for system indices matching the supplied pattern. These indices will not be managed
      * by Elasticsearch internally.
@@ -103,7 +109,7 @@ public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<Sy
      */
     public SystemIndexDescriptor(String indexPattern, String description) {
         this(indexPattern, null, description, null, null, null, 0, null, null, Version.CURRENT.minimumCompatibilityVersion(),
-            Type.INTERNAL_UNMANAGED, List.of(), List.of());
+            Type.INTERNAL_UNMANAGED, List.of(), List.of(), null);
     }
 
     /**
@@ -117,7 +123,7 @@ public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<Sy
      */
     public SystemIndexDescriptor(String indexPattern, String description, Type type, List<String> allowedElasticProductOrigins) {
         this(indexPattern, null, description, null, null, null, 0, null, null, Version.CURRENT.minimumCompatibilityVersion(), type,
-            allowedElasticProductOrigins, List.of());
+            allowedElasticProductOrigins, List.of(), null);
     }
 
     /**
@@ -154,7 +160,8 @@ public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<Sy
         Version minimumNodeVersion,
         Type type,
         List<String> allowedElasticProductOrigins,
-        List<SystemIndexDescriptor> priorSystemIndexDescriptors
+        List<SystemIndexDescriptor> priorSystemIndexDescriptors,
+        ExecutorNames executorNames
     ) {
         Objects.requireNonNull(indexPattern, "system index pattern must not be null");
         if (indexPattern.length() < 2) {
@@ -247,6 +254,18 @@ public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<Sy
             }
         }
 
+        if (Objects.nonNull(executorNames)) {
+            if (ThreadPool.THREAD_POOL_TYPES.containsKey(executorNames.threadPoolForGet()) == false) {
+                throw new IllegalArgumentException(executorNames.threadPoolForGet() + " is not a valid thread pool");
+            }
+            if (ThreadPool.THREAD_POOL_TYPES.containsKey(executorNames.threadPoolForSearch()) == false) {
+                throw new IllegalArgumentException(executorNames.threadPoolForGet() + " is not a valid thread pool");
+            }
+            if (ThreadPool.THREAD_POOL_TYPES.containsKey(executorNames.threadPoolForWrite()) == false) {
+                throw new IllegalArgumentException(executorNames.threadPoolForGet() + " is not a valid thread pool");
+            }
+        }
+
         this.indexPattern = indexPattern;
         this.primaryIndex = primaryIndex;
         this.aliasName = aliasName;
@@ -283,6 +302,9 @@ public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<Sy
             sortedPriorSystemIndexDescriptors = List.copyOf(copy);
         }
         this.priorSystemIndexDescriptors = sortedPriorSystemIndexDescriptors;
+        this.executorNames = Objects.nonNull(executorNames)
+            ? executorNames
+            : ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS;
     }
 
 
@@ -439,6 +461,14 @@ public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<Sy
         return null;
     }
 
+    /**
+     * @return The names of thread pools that should be used for operations on this
+     *    system index.
+     */
+    public ExecutorNames getThreadPoolNames() {
+        return this.executorNames;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -506,6 +536,7 @@ public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<Sy
         private Type type = Type.INTERNAL_MANAGED;
         private List<String> allowedElasticProductOrigins = List.of();
         private List<SystemIndexDescriptor> priorSystemIndexDescriptors = List.of();
+        private ExecutorNames executorNames;
 
         private Builder() {}
 
@@ -579,6 +610,11 @@ public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<Sy
             return this;
         }
 
+        public Builder setThreadPools(ExecutorNames executorNames) {
+            this.executorNames = executorNames;
+            return this;
+        }
+
         /**
          * Builds a {@link SystemIndexDescriptor} using the fields supplied to this builder.
          * @return a populated descriptor.
@@ -598,7 +634,8 @@ public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<Sy
                 minimumNodeVersion,
                 type,
                 allowedElasticProductOrigins,
-                priorSystemIndexDescriptors
+                priorSystemIndexDescriptors,
+                executorNames
             );
         }
     }
@@ -685,4 +722,5 @@ public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable<Sy
         }
         return Version.fromString(value);
     }
+
 }

+ 7 - 0
server/src/main/java/org/elasticsearch/indices/SystemIndices.java

@@ -71,6 +71,7 @@ public class SystemIndices {
     private final Predicate<String> systemDataStreamAutomaton;
     private final Map<String, Feature> featureDescriptors;
     private final Map<String, CharacterRunAutomaton> productToSystemIndicesMatcher;
+    private final ExecutorSelector executorSelector;
 
     /**
      * Initialize the SystemIndices object
@@ -85,6 +86,7 @@ public class SystemIndices {
         this.systemDataStreamIndicesAutomaton = buildDataStreamBackingIndicesAutomaton(featureDescriptors);
         this.systemDataStreamAutomaton = buildDataStreamNamePredicate(featureDescriptors);
         this.productToSystemIndicesMatcher = getProductToSystemIndicesMap(featureDescriptors);
+        this.executorSelector = new ExecutorSelector(this);
     }
 
     private static void checkForDuplicateAliases(Collection<SystemIndexDescriptor> descriptors) {
@@ -624,4 +626,9 @@ public class SystemIndices {
             plugin.getAssociatedIndexDescriptors(),
             plugin::cleanUpFeature);
     }
+
+    public ExecutorSelector getExecutorSelector() {
+        return executorSelector;
+    }
+
 }

+ 7 - 3
server/src/main/java/org/elasticsearch/node/Node.java

@@ -96,6 +96,7 @@ import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.index.engine.EngineFactory;
+import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesModule;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.ShardLimitValidator;
@@ -426,6 +427,7 @@ public class Node implements Closeable {
                     plugin -> SystemIndices.pluginToFeature(plugin, settings)
                 ));
             final SystemIndices systemIndices = new SystemIndices(featuresMap);
+            final ExecutorSelector executorSelector = systemIndices.getExecutorSelector();
 
             ModulesBuilder modules = new ModulesBuilder();
             final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
@@ -611,7 +613,7 @@ public class Node implements Closeable {
 
             final SearchService searchService = newSearchService(clusterService, indicesService,
                 threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),
-                responseCollectorService, circuitBreakerService);
+                responseCollectorService, circuitBreakerService, executorSelector);
 
             final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
                 .filterPlugins(PersistentTaskPlugin.class).stream()
@@ -687,6 +689,7 @@ public class Node implements Closeable {
                     b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
                     b.bind(FsHealthService.class).toInstance(fsHealthService);
                     b.bind(SystemIndices.class).toInstance(systemIndices);
+                    b.bind(ExecutorSelector.class).toInstance(executorSelector);
                 }
             );
             injector = modules.createInjector();
@@ -1143,9 +1146,10 @@ public class Node implements Closeable {
     protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
                                              ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
                                              FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
-                                             CircuitBreakerService circuitBreakerService) {
+                                             CircuitBreakerService circuitBreakerService, ExecutorSelector executorSelector) {
         return new SearchService(clusterService, indicesService, threadPool,
-            scriptService, bigArrays, fetchPhase, responseCollectorService, circuitBreakerService);
+            scriptService, bigArrays, fetchPhase, responseCollectorService, circuitBreakerService,
+            executorSelector);
     }
 
     /**

+ 7 - 2
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -61,6 +61,7 @@ import org.elasticsearch.index.shard.IndexEventListener;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.SearchOperationListener;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
@@ -184,6 +185,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
 
     private final ResponseCollectorService responseCollectorService;
 
+    private final ExecutorSelector executorSelector;
+
     private final BigArrays bigArrays;
 
     private final DfsPhase dfsPhase = new DfsPhase();
@@ -219,7 +222,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
 
     public SearchService(ClusterService clusterService, IndicesService indicesService,
                          ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
-                         ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService) {
+                         ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService,
+                         ExecutorSelector executorSelector) {
         Settings settings = clusterService.getSettings();
         this.threadPool = threadPool;
         this.clusterService = clusterService;
@@ -231,6 +235,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         this.fetchPhase = fetchPhase;
         this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings,
             circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));
+        this.executorSelector = executorSelector;
 
         TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
         setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
@@ -539,7 +544,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         assert indexShard != null;
         final String executorName;
         if (indexShard.isSystem()) {
-            executorName = Names.SYSTEM_READ;
+            executorName = executorSelector.executorForSearch(indexShard.shardId().getIndexName());
         } else if (indexShard.indexSettings().isSearchThrottled()) {
             executorName = Names.SEARCH_THROTTLED;
         } else {

+ 2 - 1
server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java

@@ -37,7 +37,8 @@ public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings
     }
 
     protected int applyHardSizeLimit(final Settings settings, final String name) {
-        if (name.equals("bulk") || name.equals(ThreadPool.Names.WRITE) || name.equals(ThreadPool.Names.SYSTEM_WRITE)) {
+        if (name.equals("bulk") || name.equals(ThreadPool.Names.WRITE) || name.equals(ThreadPool.Names.SYSTEM_WRITE)
+            || name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {
             return 1 + EsExecutors.allocatedProcessors(settings);
         } else {
             return Integer.MAX_VALUE;

+ 9 - 1
server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -75,6 +75,8 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
         public static final String FETCH_SHARD_STORE = "fetch_shard_store";
         public static final String SYSTEM_READ = "system_read";
         public static final String SYSTEM_WRITE = "system_write";
+        public static final String SYSTEM_CRITICAL_READ = "system_critical_read";
+        public static final String SYSTEM_CRITICAL_WRITE = "system_critical_write";
     }
 
     public enum ThreadPoolType {
@@ -123,7 +125,9 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
         entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING),
         entry(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED),
         entry(Names.SYSTEM_READ, ThreadPoolType.FIXED),
-        entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED));
+        entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED),
+        entry(Names.SYSTEM_CRITICAL_READ, ThreadPoolType.FIXED),
+        entry(Names.SYSTEM_CRITICAL_WRITE, ThreadPoolType.FIXED));
 
     private final Map<String, ExecutorHolder> executors;
 
@@ -200,6 +204,10 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
                 new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
         builders.put(Names.SYSTEM_READ, new FixedExecutorBuilder(settings, Names.SYSTEM_READ, halfProcMaxAt5, 2000, false));
         builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, false));
+        builders.put(Names.SYSTEM_CRITICAL_READ, new FixedExecutorBuilder(settings, Names.SYSTEM_CRITICAL_READ, halfProcMaxAt5, 2000,
+            false));
+        builders.put(Names.SYSTEM_CRITICAL_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_CRITICAL_WRITE, halfProcMaxAt5, 1500,
+            false));
 
         for (final ExecutorBuilder<?> builder : customBuilders) {
             if (builders.containsKey(builder.name())) {

+ 1 - 1
server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java

@@ -137,7 +137,7 @@ public class TransportMultiGetActionTests extends ESTestCase {
         when(clusterService.operationRouting()).thenReturn(operationRouting);
 
         shardAction = new TransportShardMultiGetAction(clusterService, transportService, mock(IndicesService.class), threadPool,
-            new ActionFilters(emptySet()), new Resolver()) {
+            new ActionFilters(emptySet()), new Resolver(), EmptySystemIndices.INSTANCE.getExecutorSelector()) {
             @Override
             protected void doExecute(Task task, MultiGetShardRequest request, ActionListener<MultiGetShardResponse> listener) {
             }

+ 2 - 1
server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java

@@ -137,7 +137,8 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
 
                 final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService,
                     clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()),
-                    new IndexingPressure(Settings.EMPTY), EmptySystemIndices.INSTANCE);
+                    new IndexingPressure(Settings.EMPTY), EmptySystemIndices.INSTANCE
+                );
 
                 assertThat(action.globalBlockLevel(), nullValue());
                 assertThat(action.indexBlockLevel(), nullValue());

+ 10 - 7
server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

@@ -354,10 +354,11 @@ public class TransportWriteActionTests extends ESTestCase {
 
         protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) {
             super(Settings.EMPTY, "internal:test",
-                    new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
-                        x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null,
-                new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false,
-                new IndexingPressure(Settings.EMPTY), EmptySystemIndices.INSTANCE);
+                new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
+                    x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null,
+                new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, (service, ignore) -> ThreadPool.Names.SAME, false,
+                new IndexingPressure(Settings.EMPTY), EmptySystemIndices.INSTANCE
+            );
             this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
             this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
         }
@@ -365,9 +366,11 @@ public class TransportWriteActionTests extends ESTestCase {
         protected TestAction(Settings settings, String actionName, TransportService transportService,
                              ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) {
             super(settings, actionName, transportService, clusterService,
-                    mockIndicesService(clusterService), threadPool, shardStateAction,
-                    new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false,
-                    new IndexingPressure(settings), EmptySystemIndices.INSTANCE);
+                mockIndicesService(clusterService), threadPool, shardStateAction,
+                new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, (service, ignore) -> ThreadPool.Names.SAME,
+                false,
+                new IndexingPressure(settings), EmptySystemIndices.INSTANCE
+            );
             this.withDocumentFailureOnPrimary = false;
             this.withDocumentFailureOnReplica = false;
         }

+ 3 - 2
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java

@@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemp
 import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.indices.ExecutorNames;
 import org.elasticsearch.indices.SystemDataStreamDescriptor;
 import org.elasticsearch.indices.SystemDataStreamDescriptor.Type;
 import org.elasticsearch.indices.SystemIndices;
@@ -220,7 +221,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
             Type.EXTERNAL,
             new ComposableIndexTemplate(List.of(".system-data-stream"), null, null, null, null, null, new DataStreamTemplate()),
             Map.of(),
-            List.of("stack")
-        );
+            List.of("stack"),
+            ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS);
     }
 }

+ 30 - 27
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java

@@ -89,15 +89,16 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
         when(indexShard.shardId()).thenReturn(shardId);
 
         final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction(
-                Settings.EMPTY,
-                transportService,
-                clusterService,
-                indicesService,
-                threadPool,
-                shardStateAction,
-                new ActionFilters(Collections.emptySet()),
-                new IndexingPressure(Settings.EMPTY),
-                EmptySystemIndices.INSTANCE);
+            Settings.EMPTY,
+            transportService,
+            clusterService,
+            indicesService,
+            threadPool,
+            shardStateAction,
+            new ActionFilters(Collections.emptySet()),
+            new IndexingPressure(Settings.EMPTY),
+            EmptySystemIndices.INSTANCE
+        );
         final RetentionLeases retentionLeases = mock(RetentionLeases.class);
         final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
         action.dispatchedShardOperationOnPrimary(request, indexShard,
@@ -127,15 +128,16 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
         when(indexShard.shardId()).thenReturn(shardId);
 
         final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction(
-                Settings.EMPTY,
-                transportService,
-                clusterService,
-                indicesService,
-                threadPool,
-                shardStateAction,
-                new ActionFilters(Collections.emptySet()),
-                new IndexingPressure(Settings.EMPTY),
-                EmptySystemIndices.INSTANCE);
+            Settings.EMPTY,
+            transportService,
+            clusterService,
+            indicesService,
+            threadPool,
+            shardStateAction,
+            new ActionFilters(Collections.emptySet()),
+            new IndexingPressure(Settings.EMPTY),
+            EmptySystemIndices.INSTANCE
+        );
         final RetentionLeases retentionLeases = mock(RetentionLeases.class);
         final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
 
@@ -167,15 +169,16 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
         when(indexShard.shardId()).thenReturn(shardId);
 
         final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction(
-                Settings.EMPTY,
-                transportService,
-                clusterService,
-                indicesService,
-                threadPool,
-                shardStateAction,
-                new ActionFilters(Collections.emptySet()),
-                new IndexingPressure(Settings.EMPTY),
-                EmptySystemIndices.INSTANCE);
+            Settings.EMPTY,
+            transportService,
+            clusterService,
+            indicesService,
+            threadPool,
+            shardStateAction,
+            new ActionFilters(Collections.emptySet()),
+            new IndexingPressure(Settings.EMPTY),
+            EmptySystemIndices.INSTANCE
+        );
 
         assertNull(action.indexBlockLevel());
     }

+ 149 - 0
server/src/test/java/org/elasticsearch/indices/ExecutorSelectorTests.java

@@ -0,0 +1,149 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.indices;
+
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ExecutorSelectorTests extends ESTestCase {
+
+    public void testNonCriticalSystemIndexThreadPools() {
+        ExecutorSelector service = new ExecutorSelector(new SystemIndices(
+            Map.of(
+                "normal system index",
+                new SystemIndices.Feature( "normal", "normal system index",
+                    Collections.singletonList(new SystemIndexDescriptor( ".non-critical-system-index", "test index"))
+                )
+            )
+        ));
+        String index = ".non-critical-system-index";
+        assertThat(service.executorForGet(index), equalTo(ThreadPool.Names.SYSTEM_READ));
+        assertThat(service.executorForSearch(index), equalTo(ThreadPool.Names.SYSTEM_READ));
+        assertThat(service.executorForWrite(index), equalTo(ThreadPool.Names.SYSTEM_WRITE));
+    }
+
+    public void testCriticalSystemIndexThreadPools() {
+        ExecutorSelector service = new ExecutorSelector(new SystemIndices(
+            Map.of(
+                "critical system index",
+                new SystemIndices.Feature( "critical", "critical system index", Collections.singletonList(
+                    SystemIndexDescriptor.builder()
+                        .setDescription("critical system indices")
+                        .setIndexPattern(".critical-system-*")
+                        .setType(SystemIndexDescriptor.Type.INTERNAL_UNMANAGED)
+                        .setThreadPools(ExecutorNames.CRITICAL_SYSTEM_INDEX_THREAD_POOLS)
+                        .build()
+                ))
+            )
+        ));
+        String index = ".critical-system-index";
+        assertThat(service.executorForGet(index), equalTo(ThreadPool.Names.SYSTEM_CRITICAL_READ));
+        assertThat(service.executorForSearch(index), equalTo(ThreadPool.Names.SYSTEM_CRITICAL_READ));
+        assertThat(service.executorForWrite(index), equalTo(ThreadPool.Names.SYSTEM_CRITICAL_WRITE));
+    }
+
+    public void testDefaultSystemDataStreamThreadPools() {
+        ExecutorSelector service = new ExecutorSelector(new SystemIndices(
+            Map.of(
+            "normal system index",
+                new SystemIndices.Feature( "data stream", "data stream feature with default thread pools", Collections.emptyList(),
+                    Collections.singletonList(
+                        new SystemDataStreamDescriptor( ".test-data-stream", "a data stream for testing",
+                            SystemDataStreamDescriptor.Type.INTERNAL,
+                            new ComposableIndexTemplate(
+                                List.of(".system-data-stream"),
+                                null, null, null, null, null,
+                                new ComposableIndexTemplate.DataStreamTemplate()),
+                            Map.of(),
+                            Collections.singletonList("test"),
+                            null
+                        )
+                    )
+                )
+            )
+        ));
+        String dataStream = ".test-data-stream";
+        assertThat(service.executorForGet(dataStream), equalTo(ThreadPool.Names.GET));
+        assertThat(service.executorForSearch(dataStream), equalTo(ThreadPool.Names.SEARCH));
+        assertThat(service.executorForWrite(dataStream), equalTo(ThreadPool.Names.WRITE));
+    }
+
+    public void testCustomSystemDataStreamThreadPools() {
+        ExecutorSelector service = new ExecutorSelector(new SystemIndices(
+            Map.of(
+                "normal system index",
+                new SystemIndices.Feature( "data stream", "data stream feature with custom thread pools", Collections.emptyList(),
+                    Collections.singletonList(
+                        new SystemDataStreamDescriptor( ".test-data-stream", "a data stream for testing",
+                            SystemDataStreamDescriptor.Type.INTERNAL,
+                            new ComposableIndexTemplate(
+                                List.of(".system-data-stream"),
+                                null, null, null, null, null,
+                                new ComposableIndexTemplate.DataStreamTemplate()),
+                            Map.of(),
+                            Collections.singletonList("test"),
+                            new ExecutorNames(
+                                ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
+                        )
+                    )
+                )
+            )
+        ));
+        String dataStream = ".test-data-stream";
+        assertThat(service.executorForGet(dataStream), equalTo(ThreadPool.Names.SYSTEM_CRITICAL_READ));
+        assertThat(service.executorForSearch(dataStream), equalTo(ThreadPool.Names.SYSTEM_READ));
+        assertThat(service.executorForWrite(dataStream), equalTo(ThreadPool.Names.SYSTEM_WRITE));
+    }
+
+    public void testCreateThreadPools() {
+        String getThreadPool = randomFrom(ThreadPool.THREAD_POOL_TYPES.keySet());
+        String searchThreadPool = randomFrom(ThreadPool.THREAD_POOL_TYPES.keySet());
+        String writeThreadPool = randomFrom(ThreadPool.THREAD_POOL_TYPES.keySet());
+
+        ExecutorNames executorNames =
+            new ExecutorNames(getThreadPool, searchThreadPool, writeThreadPool);
+
+        assertThat(executorNames.threadPoolForGet(), equalTo(getThreadPool));
+        assertThat(executorNames.threadPoolForSearch(), equalTo(searchThreadPool));
+        assertThat(executorNames.threadPoolForWrite(), equalTo(writeThreadPool));
+    }
+
+    public void testInvalidThreadPoolNames() {
+        String invalidThreadPool = randomValueOtherThanMany(
+            ThreadPool.THREAD_POOL_TYPES::containsKey,
+            () -> randomAlphaOfLength(8));
+
+        {
+            IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
+                () -> new ExecutorNames(invalidThreadPool, ThreadPool.Names.SEARCH, ThreadPool.Names.WRITE));
+
+            assertThat(exception.getMessage(), containsString(invalidThreadPool + " is not a valid thread pool"));
+        }
+        {
+            IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
+                () -> new ExecutorNames(ThreadPool.Names.GET, invalidThreadPool, ThreadPool.Names.WRITE));
+
+            assertThat(exception.getMessage(), containsString(invalidThreadPool + " is not a valid thread pool"));
+        }
+        {
+            IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
+                () -> new ExecutorNames(ThreadPool.Names.GET, ThreadPool.Names.SEARCH, invalidThreadPool));
+
+            assertThat(exception.getMessage(), containsString(invalidThreadPool + " is not a valid thread pool"));
+        }
+    }
+}

+ 8 - 4
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -1524,7 +1524,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
                             shardStateAction,
                             actionFilters,
                             new IndexingPressure(settings),
-                            EmptySystemIndices.INSTANCE)),
+                            EmptySystemIndices.INSTANCE
+                        )),
                     RetentionLeaseSyncer.EMPTY,
                     client);
                 final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
@@ -1553,7 +1554,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     ));
                 final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
                     clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
-                    actionFilters, indexingMemoryLimits, EmptySystemIndices.INSTANCE);
+                    actionFilters, indexingMemoryLimits, EmptySystemIndices.INSTANCE
+                );
                 actions.put(TransportShardBulkAction.TYPE, transportShardBulkAction);
                 final RestoreService restoreService = new RestoreService(
                     clusterService, repositoriesService, allocationService,
@@ -1577,13 +1579,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 final SearchTransportService searchTransportService = new SearchTransportService(transportService, client,
                     SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
                 final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService,
-                    bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService());
+                    bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService(),
+                    EmptySystemIndices.INSTANCE.getExecutorSelector());
                 SearchPhaseController searchPhaseController = new SearchPhaseController(
                     writableRegistry(), searchService::aggReduceContextBuilder);
                 actions.put(SearchAction.INSTANCE,
                     new TransportSearchAction(threadPool, new NoneCircuitBreakerService(), transportService, searchService,
                         searchTransportService, searchPhaseController, clusterService,
-                        actionFilters, indexNameExpressionResolver, namedWriteableRegistry));
+                        actionFilters, indexNameExpressionResolver, namedWriteableRegistry,
+                        EmptySystemIndices.INSTANCE.getExecutorSelector()));
                 actions.put(RestoreSnapshotAction.INSTANCE,
                     new TransportRestoreSnapshotAction(transportService, clusterService, threadPool, restoreService, actionFilters,
                         indexNameExpressionResolver));

+ 1 - 1
server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java

@@ -76,7 +76,7 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
     }
 
     private static int getExpectedThreadPoolSize(Settings settings, String name, int size) {
-        if (name.equals(ThreadPool.Names.WRITE) || name.equals(Names.SYSTEM_WRITE)) {
+        if (name.equals(ThreadPool.Names.WRITE) || name.equals(Names.SYSTEM_WRITE) || name.equals(Names.SYSTEM_CRITICAL_WRITE)) {
             return Math.min(size, EsExecutors.allocatedProcessors(settings));
         } else {
             return size;

+ 4 - 3
test/framework/src/main/java/org/elasticsearch/node/MockNode.java

@@ -24,6 +24,7 @@ import org.elasticsearch.common.util.MockPageCacheRecycler;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.http.HttpServerTransport;
+import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.recovery.RecoverySettings;
@@ -118,13 +119,13 @@ public class MockNode extends Node {
     protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
                                              ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
                                              FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
-                                             CircuitBreakerService circuitBreakerService) {
+                                             CircuitBreakerService circuitBreakerService, ExecutorSelector executorSelector) {
         if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) {
             return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase,
-                responseCollectorService, circuitBreakerService);
+                responseCollectorService, circuitBreakerService, executorSelector);
         }
         return new MockSearchService(clusterService, indicesService, threadPool, scriptService,
-            bigArrays, fetchPhase, responseCollectorService, circuitBreakerService);
+            bigArrays, fetchPhase, responseCollectorService, circuitBreakerService, executorSelector);
     }
 
     @Override

+ 4 - 2
test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java

@@ -11,6 +11,7 @@ package org.elasticsearch.search;
 import org.elasticsearch.action.search.SearchShardTask;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.node.MockNode;
@@ -71,9 +72,10 @@ public class MockSearchService extends SearchService {
 
     public MockSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
                              ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
-                             ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService) {
+                             ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService,
+                             ExecutorSelector executorSelector) {
         super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, responseCollectorService,
-            circuitBreakerService);
+            circuitBreakerService, executorSelector);
     }
 
     @Override

+ 18 - 22
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

@@ -26,30 +26,21 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.threadpool.ThreadPool.Names;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineException;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.function.Function;
 
 public class TransportBulkShardOperationsAction
         extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
 
-    private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
-        if (shard.indexSettings().getIndexMetadata().isSystem()) {
-            return Names.SYSTEM_WRITE;
-        } else {
-            return Names.WRITE;
-        }
-    };
-
     @Inject
     public TransportBulkShardOperationsAction(
             final Settings settings,
@@ -60,19 +51,24 @@ public class TransportBulkShardOperationsAction
             final ShardStateAction shardStateAction,
             final ActionFilters actionFilters,
             final IndexingPressure indexingPressure,
-            final SystemIndices systemIndices) {
+            final SystemIndices systemIndices,
+            final ExecutorSelector executorSelector) {
         super(
-                settings,
-                BulkShardOperationsAction.NAME,
-                transportService,
-                clusterService,
-                indicesService,
-                threadPool,
-                shardStateAction,
-                actionFilters,
-                BulkShardOperationsRequest::new,
-                BulkShardOperationsRequest::new,
-                EXECUTOR_NAME_FUNCTION, false, indexingPressure, systemIndices);
+            settings,
+            BulkShardOperationsAction.NAME,
+            transportService,
+            clusterService,
+            indicesService,
+            threadPool,
+            shardStateAction,
+            actionFilters,
+            BulkShardOperationsRequest::new,
+            BulkShardOperationsRequest::new,
+            ExecutorSelector::getWriteExecutorForShard,
+            false,
+            indexingPressure,
+            systemIndices
+        );
     }
 
     @Override

+ 3 - 1
x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java

@@ -31,6 +31,7 @@ import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.indices.ExecutorNames;
 import org.elasticsearch.indices.SystemDataStreamDescriptor;
 import org.elasticsearch.indices.SystemDataStreamDescriptor.Type;
 import org.elasticsearch.plugins.Plugin;
@@ -327,7 +328,8 @@ public class SystemDataStreamIT extends ESIntegTestCase {
                             new DataStreamTemplate()
                         ),
                         Map.of(),
-                        List.of("product")
+                        List.of("product"),
+                        ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
                     )
                 );
             } catch (IOException e) {

+ 3 - 1
x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java

@@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.indices.ExecutorNames;
 import org.elasticsearch.indices.SystemDataStreamDescriptor;
 import org.elasticsearch.indices.SystemIndexDescriptor;
 import org.elasticsearch.indices.SystemIndexDescriptor.Type;
@@ -269,7 +270,8 @@ public class Fleet extends Plugin implements SystemIndexPlugin {
                 SystemDataStreamDescriptor.Type.EXTERNAL,
                 composableIndexTemplate,
                 Map.of(),
-                ALLOWED_PRODUCTS
+                ALLOWED_PRODUCTS,
+                ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
             );
         } catch (IOException e) {
             throw new UncheckedIOException(e);

+ 3 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

@@ -45,6 +45,7 @@ import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.http.HttpServerTransport;
 import org.elasticsearch.index.IndexModule;
+import org.elasticsearch.indices.ExecutorNames;
 import org.elasticsearch.indices.SystemIndexDescriptor;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.ingest.Processor;
@@ -1252,6 +1253,7 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
              .setIndexFormat(INTERNAL_MAIN_INDEX_FORMAT)
              .setVersionMetaKey("security-version")
              .setOrigin(SECURITY_ORIGIN)
+             .setThreadPools(ExecutorNames.CRITICAL_SYSTEM_INDEX_THREAD_POOLS)
              .build();
      }
 
@@ -1266,6 +1268,7 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
              .setIndexFormat(INTERNAL_TOKENS_INDEX_FORMAT)
              .setVersionMetaKey(SECURITY_VERSION_STRING)
              .setOrigin(SECURITY_ORIGIN)
+             .setThreadPools(ExecutorNames.CRITICAL_SYSTEM_INDEX_THREAD_POOLS)
              .build();
      }