浏览代码

Use search threadpool in ESQL (#106050)

The esql and search thread pools share the same characteristics. Merging 
these thread pools would simplify workload tracking and autoscaling
efforts.
Nhat Nguyen 1 年之前
父节点
当前提交
6837d081f9

+ 3 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

@@ -52,6 +52,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequestHandler;
 import org.elasticsearch.transport.TransportRequestHandler;
@@ -128,13 +129,13 @@ public class EnrichLookupService {
         this.clusterService = clusterService;
         this.clusterService = clusterService;
         this.searchService = searchService;
         this.searchService = searchService;
         this.transportService = transportService;
         this.transportService = transportService;
-        this.executor = transportService.getThreadPool().executor(EsqlPlugin.ESQL_THREAD_POOL_NAME);
+        this.executor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH);
         this.bigArrays = bigArrays;
         this.bigArrays = bigArrays;
         this.blockFactory = blockFactory;
         this.blockFactory = blockFactory;
         this.localBreakerSettings = new LocalCircuitBreaker.SizeSettings(clusterService.getSettings());
         this.localBreakerSettings = new LocalCircuitBreaker.SizeSettings(clusterService.getSettings());
         transportService.registerRequestHandler(
         transportService.registerRequestHandler(
             LOOKUP_ACTION_NAME,
             LOOKUP_ACTION_NAME,
-            this.executor,
+            transportService.getThreadPool().executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME),
             in -> new LookupRequest(in, blockFactory),
             in -> new LookupRequest(in, blockFactory),
             new TransportHandler()
             new TransportHandler()
         );
         );

+ 3 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

@@ -37,7 +37,6 @@ import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;
-import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.elasticsearch.xpack.esql.session.EsqlSession;
 import org.elasticsearch.xpack.esql.session.EsqlSession;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.ql.index.EsIndex;
 import org.elasticsearch.xpack.ql.index.EsIndex;
@@ -80,7 +79,7 @@ public class EnrichPolicyResolver {
         this.threadPool = transportService.getThreadPool();
         this.threadPool = transportService.getThreadPool();
         transportService.registerRequestHandler(
         transportService.registerRequestHandler(
             RESOLVE_ACTION_NAME,
             RESOLVE_ACTION_NAME,
-            threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME),
+            threadPool.executor(ThreadPool.Names.SEARCH),
             LookupRequest::new,
             LookupRequest::new,
             new RequestHandler()
             new RequestHandler()
         );
         );
@@ -272,7 +271,7 @@ public class EnrichPolicyResolver {
                         new ActionListenerResponseHandler<>(
                         new ActionListenerResponseHandler<>(
                             refs.acquire(resp -> lookupResponses.put(cluster, resp)),
                             refs.acquire(resp -> lookupResponses.put(cluster, resp)),
                             LookupResponse::new,
                             LookupResponse::new,
-                            threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME)
+                            threadPool.executor(ThreadPool.Names.SEARCH)
                         )
                         )
                     );
                     );
                 }
                 }
@@ -290,7 +289,7 @@ public class EnrichPolicyResolver {
                     new ActionListenerResponseHandler<>(
                     new ActionListenerResponseHandler<>(
                         refs.acquire(resp -> lookupResponses.put(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, resp)),
                         refs.acquire(resp -> lookupResponses.put(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, resp)),
                         LookupResponse::new,
                         LookupResponse::new,
-                        threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME)
+                        threadPool.executor(ThreadPool.Names.SEARCH)
                     )
                     )
                 );
                 );
             }
             }

+ 4 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -81,7 +81,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 
 
-import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_THREAD_POOL_NAME;
 import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
 import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
 
 
 /**
 /**
@@ -116,7 +115,7 @@ public class ComputeService {
         this.transportService = transportService;
         this.transportService = transportService;
         this.bigArrays = bigArrays.withCircuitBreaking();
         this.bigArrays = bigArrays.withCircuitBreaking();
         this.blockFactory = blockFactory;
         this.blockFactory = blockFactory;
-        this.esqlExecutor = threadPool.executor(ESQL_THREAD_POOL_NAME);
+        this.esqlExecutor = threadPool.executor(ThreadPool.Names.SEARCH);
         transportService.registerRequestHandler(DATA_ACTION_NAME, this.esqlExecutor, DataNodeRequest::new, new DataNodeRequestHandler());
         transportService.registerRequestHandler(DATA_ACTION_NAME, this.esqlExecutor, DataNodeRequest::new, new DataNodeRequestHandler());
         transportService.registerRequestHandler(
         transportService.registerRequestHandler(
             CLUSTER_ACTION_NAME,
             CLUSTER_ACTION_NAME,
@@ -196,7 +195,7 @@ public class ComputeService {
         final List<DriverProfile> collectedProfiles = configuration.profile() ? Collections.synchronizedList(new ArrayList<>()) : List.of();
         final List<DriverProfile> collectedProfiles = configuration.profile() ? Collections.synchronizedList(new ArrayList<>()) : List.of();
         final var exchangeSource = new ExchangeSourceHandler(
         final var exchangeSource = new ExchangeSourceHandler(
             queryPragmas.exchangeBufferSize(),
             queryPragmas.exchangeBufferSize(),
-            transportService.getThreadPool().executor(ESQL_THREAD_POOL_NAME)
+            transportService.getThreadPool().executor(ThreadPool.Names.SEARCH)
         );
         );
         try (
         try (
             Releasable ignored = exchangeSource.addEmptySink();
             Releasable ignored = exchangeSource.addEmptySink();
@@ -628,7 +627,7 @@ public class ComputeService {
             final int endBatchIndex = Math.min(startBatchIndex + maxConcurrentShards, request.shardIds().size());
             final int endBatchIndex = Math.min(startBatchIndex + maxConcurrentShards, request.shardIds().size());
             List<ShardId> shardIds = request.shardIds().subList(startBatchIndex, endBatchIndex);
             List<ShardId> shardIds = request.shardIds().subList(startBatchIndex, endBatchIndex);
             acquireSearchContexts(clusterAlias, shardIds, configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
             acquireSearchContexts(clusterAlias, shardIds, configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
-                assert ThreadPool.assertCurrentThreadPool(ESQL_THREAD_POOL_NAME, ESQL_WORKER_THREAD_POOL_NAME);
+                assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ESQL_WORKER_THREAD_POOL_NAME);
                 var computeContext = new ComputeContext(sessionId, clusterAlias, searchContexts, configuration, null, exchangeSink);
                 var computeContext = new ComputeContext(sessionId, clusterAlias, searchContexts, configuration, null, exchangeSink);
                 runCompute(
                 runCompute(
                     parentTask,
                     parentTask,
@@ -734,7 +733,7 @@ public class ComputeService {
         final String localSessionId = clusterAlias + ":" + globalSessionId;
         final String localSessionId = clusterAlias + ":" + globalSessionId;
         var exchangeSource = new ExchangeSourceHandler(
         var exchangeSource = new ExchangeSourceHandler(
             configuration.pragmas().exchangeBufferSize(),
             configuration.pragmas().exchangeBufferSize(),
-            transportService.getThreadPool().executor(ESQL_THREAD_POOL_NAME)
+            transportService.getThreadPool().executor(ThreadPool.Names.SEARCH)
         );
         );
         try (
         try (
             Releasable ignored = exchangeSource.addEmptySink();
             Releasable ignored = exchangeSource.addEmptySink();

+ 1 - 16
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -70,7 +70,6 @@ import java.util.stream.Stream;
 
 
 public class EsqlPlugin extends Plugin implements ActionPlugin {
 public class EsqlPlugin extends Plugin implements ActionPlugin {
 
 
-    public static final String ESQL_THREAD_POOL_NAME = "esql";
     public static final String ESQL_WORKER_THREAD_POOL_NAME = "esql_worker";
     public static final String ESQL_WORKER_THREAD_POOL_NAME = "esql_worker";
 
 
     public static final Setting<Integer> QUERY_RESULT_TRUNCATION_MAX_SIZE = Setting.intSetting(
     public static final Setting<Integer> QUERY_RESULT_TRUNCATION_MAX_SIZE = Setting.intSetting(
@@ -112,12 +111,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
                 ),
                 ),
                 new EsqlIndexResolver(services.client(), EsqlDataTypeRegistry.INSTANCE)
                 new EsqlIndexResolver(services.client(), EsqlDataTypeRegistry.INSTANCE)
             ),
             ),
-            new ExchangeService(
-                services.clusterService().getSettings(),
-                services.threadPool(),
-                EsqlPlugin.ESQL_THREAD_POOL_NAME,
-                blockFactory
-            ),
+            new ExchangeService(services.clusterService().getSettings(), services.threadPool(), ThreadPool.Names.SEARCH, blockFactory),
             blockFactory
             blockFactory
         );
         );
     }
     }
@@ -186,18 +180,9 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
         ).toList();
         ).toList();
     }
     }
 
 
-    @Override
     public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
     public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
         final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
         final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
         return List.of(
         return List.of(
-            new FixedExecutorBuilder(
-                settings,
-                ESQL_THREAD_POOL_NAME,
-                allocatedProcessors,
-                1000,
-                ESQL_THREAD_POOL_NAME,
-                EsExecutors.TaskTrackingConfig.DEFAULT
-            ),
             // TODO: Maybe have two types of threadpools for workers: one for CPU-bound and one for I/O-bound tasks.
             // TODO: Maybe have two types of threadpools for workers: one for CPU-bound and one for I/O-bound tasks.
             // And we should also reduce the number of threads of the CPU-bound threadpool to allocatedProcessors.
             // And we should also reduce the number of threads of the CPU-bound threadpool to allocatedProcessors.
             new FixedExecutorBuilder(
             new FixedExecutorBuilder(

+ 2 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -82,7 +82,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
         super(EsqlQueryAction.NAME, transportService, actionFilters, EsqlQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
         super(EsqlQueryAction.NAME, transportService, actionFilters, EsqlQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
         this.planExecutor = planExecutor;
         this.planExecutor = planExecutor;
         this.clusterService = clusterService;
         this.clusterService = clusterService;
-        this.requestExecutor = threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME);
+        this.requestExecutor = threadPool.executor(ThreadPool.Names.SEARCH);
         exchangeService.registerTransportHandler(transportService);
         exchangeService.registerTransportHandler(transportService);
         this.exchangeService = exchangeService;
         this.exchangeService = exchangeService;
         this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver());
         this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver());
@@ -124,7 +124,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
     }
     }
 
 
     private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
     private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
-        assert ThreadPool.assertCurrentThreadPool(EsqlPlugin.ESQL_THREAD_POOL_NAME);
+        assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
         if (requestIsAsync(request)) {
         if (requestIsAsync(request)) {
             asyncTaskManagementService.asyncExecute(
             asyncTaskManagementService.asyncExecute(
                 request,
                 request,

+ 15 - 21
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -96,6 +96,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
 import static org.elasticsearch.test.ListMatcher.matchesList;
 import static org.elasticsearch.test.ListMatcher.matchesList;
@@ -107,7 +108,6 @@ import static org.elasticsearch.xpack.esql.CsvTestUtils.loadPageFromCsv;
 import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP;
 import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
-import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_THREAD_POOL_NAME;
 import static org.elasticsearch.xpack.ql.CsvSpecReader.specParser;
 import static org.elasticsearch.xpack.ql.CsvSpecReader.specParser;
 import static org.elasticsearch.xpack.ql.TestUtils.classpathResources;
 import static org.elasticsearch.xpack.ql.TestUtils.classpathResources;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
@@ -161,6 +161,7 @@ public class CsvTests extends ESTestCase {
     private final Mapper mapper = new Mapper(functionRegistry);
     private final Mapper mapper = new Mapper(functionRegistry);
     private final PhysicalPlanOptimizer physicalPlanOptimizer = new TestPhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration));
     private final PhysicalPlanOptimizer physicalPlanOptimizer = new TestPhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration));
     private ThreadPool threadPool;
     private ThreadPool threadPool;
+    private Executor executor;
 
 
     @ParametersFactory(argumentFormatting = "%2$s.%3$s")
     @ParametersFactory(argumentFormatting = "%2$s.%3$s")
     public static List<Object[]> readScriptSpec() throws Exception {
     public static List<Object[]> readScriptSpec() throws Exception {
@@ -174,18 +175,17 @@ public class CsvTests extends ESTestCase {
     @Before
     @Before
     public void setUp() throws Exception {
     public void setUp() throws Exception {
         super.setUp();
         super.setUp();
-        int numThreads = randomBoolean() ? 1 : between(2, 16);
-        threadPool = new TestThreadPool(
-            "CsvTests",
-            new FixedExecutorBuilder(
-                Settings.EMPTY,
-                ESQL_THREAD_POOL_NAME,
-                numThreads,
-                1024,
-                "esql",
-                EsExecutors.TaskTrackingConfig.DEFAULT
-            )
-        );
+        if (randomBoolean()) {
+            int numThreads = randomBoolean() ? 1 : between(2, 16);
+            threadPool = new TestThreadPool(
+                "CsvTests",
+                new FixedExecutorBuilder(Settings.EMPTY, "esql_test", numThreads, 1024, "esql", EsExecutors.TaskTrackingConfig.DEFAULT)
+            );
+            executor = threadPool.executor("esql_test");
+        } else {
+            threadPool = new TestThreadPool(getTestName());
+            executor = threadPool.executor(ThreadPool.Names.SEARCH);
+        }
         HeaderWarning.setThreadContext(threadPool.getThreadContext());
         HeaderWarning.setThreadContext(threadPool.getThreadContext());
     }
     }
 
 
@@ -343,7 +343,7 @@ public class CsvTests extends ESTestCase {
             bigArrays,
             bigArrays,
             ByteSizeValue.ofBytes(randomLongBetween(1, BlockFactory.DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE.getBytes() * 2))
             ByteSizeValue.ofBytes(randomLongBetween(1, BlockFactory.DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE.getBytes() * 2))
         );
         );
-        ExchangeSourceHandler exchangeSource = new ExchangeSourceHandler(between(1, 64), threadPool.executor(ESQL_THREAD_POOL_NAME));
+        ExchangeSourceHandler exchangeSource = new ExchangeSourceHandler(between(1, 64), executor);
         ExchangeSinkHandler exchangeSink = new ExchangeSinkHandler(blockFactory, between(1, 64), threadPool::relativeTimeInMillis);
         ExchangeSinkHandler exchangeSink = new ExchangeSinkHandler(blockFactory, between(1, 64), threadPool::relativeTimeInMillis);
         LocalExecutionPlanner executionPlanner = new LocalExecutionPlanner(
         LocalExecutionPlanner executionPlanner = new LocalExecutionPlanner(
             sessionId,
             sessionId,
@@ -406,13 +406,7 @@ public class CsvTests extends ESTestCase {
             DriverRunner runner = new DriverRunner(threadPool.getThreadContext()) {
             DriverRunner runner = new DriverRunner(threadPool.getThreadContext()) {
                 @Override
                 @Override
                 protected void start(Driver driver, ActionListener<Void> driverListener) {
                 protected void start(Driver driver, ActionListener<Void> driverListener) {
-                    Driver.start(
-                        threadPool.getThreadContext(),
-                        threadPool.executor(ESQL_THREAD_POOL_NAME),
-                        driver,
-                        between(1, 1000),
-                        driverListener
-                    );
+                    Driver.start(threadPool.getThreadContext(), executor, driver, between(1, 1000), driverListener);
                 }
                 }
             };
             };
             PlainActionFuture<ActualResults> future = new PlainActionFuture<>();
             PlainActionFuture<ActualResults> future = new PlainActionFuture<>();