Browse Source

Add ESQL threadpool (ESQL-1202)

This PR introduces a dedicated thread pool for ESQL to avoid competition
with search requests. The new threadpool has the same configuration as
the search threadpool.

Closes ESQL-1150
Nhat Nguyen 2 years ago
parent
commit
8f9b8b1df8

+ 2 - 7
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java

@@ -38,14 +38,9 @@ public class DriverTaskRunner {
     public static final String ACTION_NAME = "internal:data/read/esql/compute";
     private final TransportService transportService;
 
-    public DriverTaskRunner(TransportService transportService, ThreadPool threadPool) {
+    public DriverTaskRunner(TransportService transportService, Executor executor) {
         this.transportService = transportService;
-        transportService.registerRequestHandler(
-            ACTION_NAME,
-            ThreadPool.Names.SAME,
-            DriverRequest::new,
-            new DriverRequestHandler(threadPool.executor(ThreadPool.Names.SEARCH))
-        );
+        transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SAME, DriverRequest::new, new DriverRequestHandler(executor));
     }
 
     public void executeDrivers(Task parentTask, List<Driver> drivers, ActionListener<Void> listener) {

+ 2 - 4
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

@@ -33,7 +33,6 @@ import org.elasticsearch.transport.TransportService;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -115,9 +114,8 @@ public final class ExchangeService extends AbstractLifecycleComponent {
      *
      * @throws IllegalStateException if a source handler for the given id already exists
      */
-    public ExchangeSourceHandler createSourceHandler(String exchangeId, int maxBufferSize) {
-        Executor fetchExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION);
-        ExchangeSourceHandler sourceHandler = new ExchangeSourceHandler(maxBufferSize, fetchExecutor);
+    public ExchangeSourceHandler createSourceHandler(String exchangeId, int maxBufferSize, String fetchExecutor) {
+        ExchangeSourceHandler sourceHandler = new ExchangeSourceHandler(maxBufferSize, threadPool.executor(fetchExecutor));
         if (sources.putIfAbsent(exchangeId, sourceHandler) != null) {
             throw new IllegalStateException("source exchanger for id [" + exchangeId + "] already exists");
         }

+ 5 - 8
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

@@ -74,6 +74,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
 import org.elasticsearch.search.aggregations.support.ValuesSource;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.FixedExecutorBuilder;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.ql.util.Holder;
@@ -88,7 +89,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -109,7 +109,8 @@ public class OperatorTests extends ESTestCase {
     @Before
     public void setUp() throws Exception {
         super.setUp();
-        threadPool = new TestThreadPool("OperatorTests");
+        int numThreads = randomBoolean() ? 1 : between(2, 16);
+        threadPool = new TestThreadPool("OperatorTests", new FixedExecutorBuilder(Settings.EMPTY, "esql", numThreads, 1024, "esql", false));
     }
 
     @After
@@ -226,7 +227,7 @@ public class OperatorTests extends ESTestCase {
                             )
                         );
                     }
-                    runToCompletion(threadPool.executor(ThreadPool.Names.SEARCH), drivers);
+                    runToCompletion(threadPool.executor("esql"), drivers);
                 } finally {
                     Releasables.close(drivers);
                 }
@@ -283,7 +284,7 @@ public class OperatorTests extends ESTestCase {
                     });
                     drivers.add(new Driver(queryOperator, List.of(), docCollector, () -> {}));
                 }
-                runToCompletion(threadPool.executor(ThreadPool.Names.SEARCH), drivers);
+                runToCompletion(threadPool.executor("esql"), drivers);
                 Set<Integer> expectedDocIds = searchForDocIds(reader, query);
                 assertThat("query=" + query + ", partition=" + partition, actualDocIds, equalTo(expectedDocIds));
             } finally {
@@ -318,10 +319,6 @@ public class OperatorTests extends ESTestCase {
         );
     }
 
-    private Executor randomExecutor() {
-        return threadPool.executor(randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC, ThreadPool.Names.SEARCH));
-    }
-
     public void testOperatorsWithLuceneGroupingCount() throws IOException {
         BigArrays bigArrays = bigArrays();
         final String fieldName = "value";

+ 7 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

@@ -322,7 +322,7 @@ public class ExchangeServiceTests extends ESTestCase {
         try (exchange0; exchange1; node0; node1) {
             String exchangeId = "exchange";
             Task task = new Task(1, "", "", "", null, Collections.emptyMap());
-            ExchangeSourceHandler sourceHandler = exchange0.createSourceHandler(exchangeId, randomExchangeBuffer());
+            ExchangeSourceHandler sourceHandler = exchange0.createSourceHandler(exchangeId, randomExchangeBuffer(), "esql_test_executor");
             ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomExchangeBuffer());
             sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, node1.getLocalNode()), randomIntBetween(1, 5));
             final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
@@ -371,7 +371,7 @@ public class ExchangeServiceTests extends ESTestCase {
         try (exchange0; exchange1; node0; node1) {
             String exchangeId = "exchange";
             Task task = new Task(1, "", "", "", null, Collections.emptyMap());
-            ExchangeSourceHandler sourceHandler = exchange0.createSourceHandler(exchangeId, randomIntBetween(1, 128));
+            ExchangeSourceHandler sourceHandler = exchange0.createSourceHandler(exchangeId, randomIntBetween(1, 128), "esql_test_executor");
             ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomIntBetween(1, 128));
             sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, node1.getLocalNode()), randomIntBetween(1, 5));
             Exception err = expectThrows(
@@ -429,7 +429,11 @@ public class ExchangeServiceTests extends ESTestCase {
             {
                 final int maxOutputSeqNo = randomIntBetween(1, 50_000);
                 SeqNoCollector seqNoCollector = new SeqNoCollector(maxOutputSeqNo);
-                ExchangeSourceHandler sourceHandler = exchange0.createSourceHandler(exchangeId, randomIntBetween(1, 128));
+                ExchangeSourceHandler sourceHandler = exchange0.createSourceHandler(
+                    exchangeId,
+                    randomIntBetween(1, 128),
+                    "esql_test_executor"
+                );
                 sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, node1.getLocalNode()), randomIntBetween(1, 5));
                 int numSources = randomIntBetween(1, 10);
                 List<Driver> sourceDrivers = new ArrayList<>(numSources);

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

@@ -62,6 +62,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
 import org.elasticsearch.xpack.esql.plan.physical.RowExec;
 import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
 import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
+import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
 import org.elasticsearch.xpack.ql.expression.Alias;
 import org.elasticsearch.xpack.ql.expression.Attribute;
@@ -275,7 +276,7 @@ public class LocalExecutionPlanner {
 
                 var pragmas = configuration.pragmas();
                 var sinkHandler = new ExchangeSinkHandler(pragmas.exchangeBufferSize());
-                var executor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION);
+                var executor = threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME);
                 var sourceHandler = new ExchangeSourceHandler(pragmas.exchangeBufferSize(), executor);
                 sourceHandler.addRemoteSink(sinkHandler::fetchPageAsync, pragmas.concurrentExchangeClients());
                 PhysicalOperation sinkOperator = source.withSink(

+ 9 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -62,6 +62,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_THREAD_POOL_NAME;
+
 /**
  * Computes the result of a {@link PhysicalPlan}.
  */
@@ -90,11 +92,11 @@ public class ComputeService {
         this.bigArrays = bigArrays.withCircuitBreaking();
         transportService.registerRequestHandler(
             DATA_ACTION_NAME,
-            ThreadPool.Names.SEARCH,
+            ESQL_THREAD_POOL_NAME,
             DataNodeRequest::new,
             new DataNodeRequestHandler()
         );
-        this.driverRunner = new DriverTaskRunner(transportService, threadPool);
+        this.driverRunner = new DriverTaskRunner(transportService, threadPool.executor(ESQL_THREAD_POOL_NAME));
         this.exchangeService = exchangeService;
     }
 
@@ -125,7 +127,11 @@ public class ComputeService {
         ClusterState clusterState = clusterService.state();
         Map<String, List<ShardId>> targetNodes = computeTargetNodes(clusterState, indexNames);
 
-        final ExchangeSourceHandler sourceHandler = exchangeService.createSourceHandler(sessionId, queryPragmas.exchangeBufferSize());
+        final ExchangeSourceHandler sourceHandler = exchangeService.createSourceHandler(
+            sessionId,
+            queryPragmas.exchangeBufferSize(),
+            ESQL_THREAD_POOL_NAME
+        );
         final ActionListener<Void> listener = ActionListener.releaseAfter(
             outListener.map(unused -> collectedPages),
             () -> exchangeService.completeSourceHandler(sessionId)

+ 20 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
@@ -36,6 +37,8 @@ import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestHandler;
 import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.threadpool.ExecutorBuilder;
+import org.elasticsearch.threadpool.FixedExecutorBuilder;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.tracing.Tracer;
 import org.elasticsearch.watcher.ResourceWatcherService;
@@ -58,6 +61,8 @@ import java.util.stream.Stream;
 
 public class EsqlPlugin extends Plugin implements ActionPlugin {
 
+    public static final String ESQL_THREAD_POOL_NAME = "esql";
+
     public static final Setting<Integer> QUERY_RESULT_TRUNCATION_MAX_SIZE = Setting.intSetting(
         "esql.query.result_truncation_max_size",
         10000,
@@ -136,4 +141,19 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
             Block.getNamedWriteables().stream()
         ).toList();
     }
+
+    @Override
+    public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
+        final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
+        return List.of(
+            new FixedExecutorBuilder(
+                settings,
+                ESQL_THREAD_POOL_NAME,
+                ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors),
+                1000,
+                "esql",
+                true
+            )
+        );
+    }
 }

+ 13 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.FixedExecutorBuilder;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.esql.CsvTestUtils.ActualResults;
@@ -84,6 +85,7 @@ import static org.elasticsearch.xpack.esql.CsvTestUtils.loadCsvSpecValues;
 import static org.elasticsearch.xpack.esql.CsvTestUtils.loadPageFromCsv;
 import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
+import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_THREAD_POOL_NAME;
 import static org.elasticsearch.xpack.ql.CsvSpecReader.specParser;
 import static org.elasticsearch.xpack.ql.TestUtils.classpathResources;
 
@@ -149,7 +151,11 @@ public class CsvTests extends ESTestCase {
     @Before
     public void setUp() throws Exception {
         super.setUp();
-        threadPool = new TestThreadPool("CsvTests");
+        int numThreads = randomBoolean() ? 1 : between(2, 16);
+        threadPool = new TestThreadPool(
+            "CsvTests",
+            new FixedExecutorBuilder(Settings.EMPTY, ESQL_THREAD_POOL_NAME, numThreads, 1024, "esql", false)
+        );
     }
 
     @After
@@ -280,7 +286,11 @@ public class CsvTests extends ESTestCase {
 
         // replace fragment inside the coordinator plan
         try {
-            ExchangeSourceHandler sourceHandler = exchangeService.createSourceHandler(sessionId, randomIntBetween(1, 64));
+            ExchangeSourceHandler sourceHandler = exchangeService.createSourceHandler(
+                sessionId,
+                randomIntBetween(1, 64),
+                ESQL_THREAD_POOL_NAME
+            );
             LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan(new OutputExec(coordinatorPlan, collectedPages::add));
             drivers.addAll(coordinatorNodeExecutionPlan.createDrivers(sessionId));
             if (dataNodePlan != null) {
@@ -290,7 +300,7 @@ public class CsvTests extends ESTestCase {
                 LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan(csvDataNodePhysicalPlan);
                 drivers.addAll(dataNodeExecutionPlan.createDrivers(sessionId));
             }
-            runToCompletion(threadPool.executor(ThreadPool.Names.SEARCH), drivers);
+            runToCompletion(threadPool.executor(ESQL_THREAD_POOL_NAME), drivers);
         } finally {
             Releasables.close(
                 () -> Releasables.close(drivers),