Forráskód Böngészése

ESQL: Calculate concurrent node limit (#124901)

Iván Cea Fontenla 6 hónapja
szülő
commit
bd04d1fe1c

+ 5 - 0
docs/changelog/124901.yaml

@@ -0,0 +1,5 @@
+pr: 124901
+summary: Calculate concurrent node limit
+area: ES|QL
+type: feature
+issues: []

+ 106 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculator.java

@@ -0,0 +1,106 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.planner;
+
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.xpack.esql.core.expression.Literal;
+import org.elasticsearch.xpack.esql.core.util.Holder;
+import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
+import org.elasticsearch.xpack.esql.plan.logical.Limit;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
+import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.session.Configuration;
+
+/**
+ * Calculates the maximum number of nodes that should be queried concurrently for the given data node plan.
+ * <p>
+ *     Used to avoid overloading the cluster with concurrent requests that may not be needed.
+ * </p>
+ */
+public class PlanConcurrencyCalculator {
+    public static final PlanConcurrencyCalculator INSTANCE = new PlanConcurrencyCalculator();
+
+    private PlanConcurrencyCalculator() {}
+
+    /**
+     * @return {@code null} if there should be no limit, otherwise, the maximum number of nodes that should be queried concurrently.
+     */
+    @Nullable
+    public Integer calculateNodesConcurrency(PhysicalPlan dataNodePlan, Configuration configuration) {
+        // If available, pragma overrides any calculation
+        if (configuration.pragmas().maxConcurrentNodesPerCluster() > 0) {
+            return configuration.pragmas().maxConcurrentNodesPerCluster();
+        }
+        if (dataNodePlan == null) {
+            return null;
+        }
+
+        Integer dataNodeLimit = getDataNodeLimit(dataNodePlan);
+
+        if (dataNodeLimit != null) {
+            return limitToConcurrency(dataNodeLimit);
+        }
+
+        return null;
+    }
+
+    private Integer limitToConcurrency(int limit) {
+        // For high limits, don't limit the concurrency
+        if (limit > 1000) {
+            return null;
+        }
+
+        // At least 2 nodes, otherwise log2(limit). E.g.
+        // Limit | Concurrency
+        // 1 | 2
+        // 10 | 3
+        // 1000 | 9
+        return Math.max(2, (int) (Math.log(limit) / Math.log(2)));
+    }
+
+    @Nullable
+    private Integer getDataNodeLimit(PhysicalPlan dataNodePlan) {
+        LogicalPlan logicalPlan = getFragmentPlan(dataNodePlan);
+
+        // State machine to find:
+        // A relation
+        Holder<Boolean> relationFound = new Holder<>(false);
+        // ...followed by no other node that could break the calculation
+        Holder<Boolean> forbiddenNodeFound = new Holder<>(false);
+        // ...and finally, a limit
+        Holder<Integer> limitValue = new Holder<>(null);
+
+        logicalPlan.forEachUp(node -> {
+            // If a limit or a forbidden command was already found, ignore the rest
+            if (limitValue.get() == null && forbiddenNodeFound.get() == false) {
+                if (node instanceof EsRelation) {
+                    relationFound.set(true);
+                } else if (relationFound.get()) {
+                    if (node instanceof Limit limit && limit.limit() instanceof Literal literalLimit) {
+                        limitValue.set((Integer) literalLimit.value());
+                    } else {
+                        forbiddenNodeFound.set(true);
+                    }
+                }
+            }
+        });
+
+        return limitValue.get();
+    }
+
+    private LogicalPlan getFragmentPlan(PhysicalPlan plan) {
+        Holder<LogicalPlan> foundPlan = new Holder<>();
+        plan.forEachDown(node -> {
+            if (node instanceof FragmentExec fragment) {
+                foundPlan.set(fragment.fragment());
+            }
+        });
+        return foundPlan.get();
+    }
+}

+ 4 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

@@ -42,6 +42,7 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.esql.core.expression.FoldContext;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.planner.PlanConcurrencyCalculator;
 import org.elasticsearch.xpack.esql.planner.PlannerUtils;
 import org.elasticsearch.xpack.esql.session.Configuration;
 
@@ -98,13 +99,15 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
         Runnable runOnTaskFailure,
         ActionListener<ComputeResponse> outListener
     ) {
+        Integer maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);
+
         new DataNodeRequestSender(
             transportService,
             esqlExecutor,
             clusterAlias,
             parentTask,
             configuration.allowPartialResults(),
-            configuration.pragmas().maxConcurrentNodesPerCluster()
+            maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster
         ) {
             @Override
             protected void sendRequest(

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

@@ -57,7 +57,7 @@ public final class QueryPragmas implements Writeable {
     public static final Setting<TimeValue> STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL);
 
     public static final Setting<Integer> MAX_CONCURRENT_NODES_PER_CLUSTER = //
-        Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1, 100);
+        Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1);
     public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = //
         Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
 

+ 258 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculatorTests.java

@@ -0,0 +1,258 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.planner;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.esql.analysis.Analyzer;
+import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils;
+import org.elasticsearch.xpack.esql.core.expression.FoldContext;
+import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
+import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
+import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext;
+import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
+import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
+import org.elasticsearch.xpack.esql.session.Configuration;
+
+import java.util.List;
+
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
+import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzer;
+import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzerDefaultMapping;
+import static org.hamcrest.Matchers.equalTo;
+
+public class PlanConcurrencyCalculatorTests extends ESTestCase {
+    public void testSimpleLimit() {
+        assertConcurrency("""
+            FROM x
+            | LIMIT 512
+            """, 9);
+    }
+
+    public void testLimitZero() {
+        assertConcurrency("FROM x | LIMIT 0", null);
+    }
+
+    public void testBiggestPragmaOverride() {
+        assertConcurrency("""
+            FROM x
+            | LIMIT 512
+            """, Integer.MAX_VALUE, Integer.MAX_VALUE);
+    }
+
+    public void testSmallestPragmaOverride() {
+        assertConcurrency("""
+            FROM x
+            | LIMIT 512
+            """, 1, 1);
+    }
+
+    public void testPragmaOverrideWithUnsupportedCommands() {
+        assertConcurrency("""
+            FROM x
+            | WHERE salary * 2 > 5
+            | LIMIT 512
+            """, 1, 1);
+    }
+
+    public void testImplicitLimit() {
+        assertConcurrency("""
+            FROM x
+            """, 9);
+    }
+
+    public void testStats() {
+        assertConcurrency("""
+            FROM x
+            | STATS COUNT(salary)
+            """, null);
+    }
+
+    public void testStatsWithLimit() {
+        assertConcurrency("""
+            FROM x
+            | LIMIT 512
+            | STATS COUNT(salary)
+            """, 9);
+    }
+
+    public void testSortBeforeLimit() {
+        assertConcurrency("""
+            FROM x
+            | SORT salary
+            """, null);
+    }
+
+    public void testSortAfterLimit() {
+        assertConcurrency("""
+            FROM x
+            | LIMIT 512
+            | SORT salary
+            """, 9);
+    }
+
+    public void testStatsWithSortBeforeLimit() {
+        assertConcurrency("""
+            FROM x
+            | SORT salary
+            | LIMIT 512
+            | STATS COUNT(salary)
+            """, null);
+    }
+
+    public void testStatsWithSortAfterLimit() {
+        assertConcurrency("""
+            FROM x
+            | SORT salary
+            | LIMIT 512
+            | STATS COUNT(salary)
+            """, null);
+    }
+
+    public void testWhereBeforeLimit() {
+        assertConcurrency("""
+            FROM x
+            | WHERE salary * 2 > 5
+            | LIMIT 512
+            """, null);
+    }
+
+    public void testWhereAfterLimit() {
+        assertConcurrency("""
+            FROM x
+            | LIMIT 512
+            | WHERE salary * 2 > 5
+            """, 9);
+    }
+
+    public void testWherePushedToLuceneQueryBeforeLimit() {
+        assertConcurrency("""
+            FROM x
+            | WHERE first_name LIKE "A%"
+            | LIMIT 512
+            """, null);
+    }
+
+    public void testWherePushedToLuceneQueryAfterLimit() {
+        assertConcurrency("""
+            FROM x
+            | LIMIT 512
+            | WHERE first_name LIKE "A%"
+            """, 9);
+    }
+
+    public void testExpand() {
+        assertConcurrency("""
+            FROM x
+            | LIMIT 2048
+            | MV_EXPAND salary
+            | LIMIT 512
+            """, 9);
+    }
+
+    public void testEval() {
+        assertConcurrency("""
+            FROM x
+            | EVAL x=salary*2
+            | LIMIT 512
+            """, 9);
+    }
+
+    public void testRename() {
+        assertConcurrency("""
+            FROM x
+            | RENAME salary as x
+            | LIMIT 512
+            """, 9);
+    }
+
+    public void testKeep() {
+        assertConcurrency("""
+            FROM x
+            | KEEP salary
+            | LIMIT 512
+            """, 9);
+    }
+
+    public void testDrop() {
+        assertConcurrency("""
+            FROM x
+            | DROP salary
+            | LIMIT 512
+            """, 9);
+    }
+
+    public void testDissect() {
+        assertConcurrency("""
+            FROM x
+            | DISSECT first_name "%{a} %{b}"
+            | LIMIT 512
+            """, 9);
+    }
+
+    public void testGrok() {
+        assertConcurrency("""
+            FROM x
+            | GROK first_name "%{EMAILADDRESS:email}"
+            | LIMIT 512
+            """, 9);
+    }
+
+    public void testEnrich() {
+        assertConcurrency("""
+            FROM x
+            | ENRICH languages ON first_name
+            | LIMIT 512
+            """, 9);
+    }
+
+    public void testLookup() {
+        assertConcurrency("""
+            FROM x
+            | RENAME salary as language_code
+            | LOOKUP JOIN languages_lookup on language_code
+            | LIMIT 512
+            """, 9);
+    }
+
+    private void assertConcurrency(String query, Integer expectedConcurrency) {
+        assertConcurrency(query, null, expectedConcurrency);
+    }
+
+    private void assertConcurrency(String query, Integer concurrencyPragmaValue, Integer expectedConcurrency) {
+        Configuration configuration = concurrencyPragmaValue == null
+            ? configuration(query)
+            : configuration(
+                new QueryPragmas(Settings.builder().put("max_concurrent_nodes_per_cluster", concurrencyPragmaValue).build()),
+                query
+            );
+
+        Analyzer analyzer = analyzer(analyzerDefaultMapping(), TEST_VERIFIER, configuration);
+        LogicalPlan logicalPlan = AnalyzerTestUtils.analyze(query, analyzer);
+        logicalPlan = new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, FoldContext.small())).optimize(logicalPlan);
+
+        PhysicalPlan physicalPlan = new Mapper().map(logicalPlan);
+        physicalPlan = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)).optimize(physicalPlan);
+
+        PhysicalPlan dataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(physicalPlan, configuration).v2();
+
+        Integer actualConcurrency = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);
+
+        assertThat(actualConcurrency, equalTo(expectedConcurrency));
+    }
+
+    @Override
+    protected List<String> filteredWarnings() {
+        return withDefaultLimitWarning(super.filteredWarnings());
+    }
+}