Browse Source

ESQL: perform a reduction on the data node (#106516)

* Introduce node-level reduction (instead of the coordinator level one) behind a pragma
Andrei Stefan 1 year ago
parent
commit
f4613d0248
18 changed files with 224 additions and 38 deletions
  1. 5 0
      docs/changelog/106516.yaml
  2. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  3. 3 0
      x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java
  4. 3 3
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec
  5. 3 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java
  6. 86 14
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java
  7. 5 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java
  8. 4 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java
  9. 9 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java
  10. 2 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java
  11. 24 7
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExec.java
  12. 5 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
  13. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java
  14. 38 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
  15. 23 4
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
  16. 10 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java
  17. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
  18. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java

+ 5 - 0
docs/changelog/106516.yaml

@@ -0,0 +1,5 @@
+pr: 106516
+summary: "ESQL: perform a reduction on the data node"
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -158,6 +158,7 @@ public class TransportVersions {
     public static final TransportVersion SEARCH_NODE_LOAD_AUTOSCALING = def(8_617_00_0);
     public static final TransportVersion ESQL_ES_SOURCE_OPTIONS = def(8_618_00_0);
     public static final TransportVersion ADD_PERSISTENT_TASK_EXCEPTIONS = def(8_619_00_0);
+    public static final TransportVersion ESQL_REDUCER_NODE_FRAGMENT = def(8_620_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 3 - 0
x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java

@@ -389,6 +389,9 @@ public class EsqlSecurityIT extends ESRestTestCase {
         if (randomBoolean()) {
             settings.put("enrich_max_workers", between(1, 5));
         }
+        if (randomBoolean()) {
+            settings.put("node_level_reduction", randomBoolean());
+        }
         return settings.build();
     }
 }

+ 3 - 3
x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec

@@ -62,9 +62,9 @@ eth2           |epsilon        |[fe81::cae2:65ff:fece:feb9, fe82::cae2:65ff:fece
 lessThan
 required_feature: esql.mv_warn
 
-from hosts | sort host, card | where ip0 < ip1 | keep card, host, ip0, ip1;
-warning:Line 1:38: evaluation of [ip0 < ip1] failed, treating result as null. Only first 20 failures recorded.
-warning:Line 1:38: java.lang.IllegalArgumentException: single-value function encountered multi-value
+from hosts | sort host, card, ip1 | where ip0 < ip1 | keep card, host, ip0, ip1;
+warning:Line 1:43: evaluation of [ip0 < ip1] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 1:43: java.lang.IllegalArgumentException: single-value function encountered multi-value
 
 card:keyword   |host:keyword   |ip0:ip                   |ip1:ip
 eth1           |beta           |127.0.0.1                |127.0.0.2

+ 3 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java

@@ -188,6 +188,9 @@ public abstract class AbstractEsqlIntegTestCase extends ESIntegTestCase {
             if (randomBoolean()) {
                 settings.put("max_concurrent_shards_per_node", randomIntBetween(1, 10));
             }
+            if (randomBoolean()) {
+                settings.put("node_level_reduction", randomBoolean());
+            }
         }
         return new QueryPragmas(settings.build());
     }

+ 86 - 14
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

@@ -76,6 +76,7 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
     private String READ_DESCRIPTION;
     private String MERGE_DESCRIPTION;
     private String REDUCE_DESCRIPTION;
+    private boolean nodeLevelReduction;
 
     @Before
     public void setup() {
@@ -94,6 +95,7 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
         REDUCE_DESCRIPTION = """
             \\_ExchangeSourceOperator[]
             \\_ExchangeSinkOperator""";
+        nodeLevelReduction = randomBoolean();
     }
 
     public void testTaskContents() throws Exception {
@@ -209,22 +211,31 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
     }
 
     private ActionFuture<EsqlQueryResponse> startEsql() {
+        return startEsql("from test | stats sum(pause_me)");
+    }
+
+    private ActionFuture<EsqlQueryResponse> startEsql(String query) {
         scriptPermits.drainPermits();
         scriptPermits.release(between(1, 5));
-        var pragmas = new QueryPragmas(
-            Settings.builder()
-                // Force shard partitioning because that's all the tests know how to match. It is easier to reason about too.
-                .put("data_partitioning", "shard")
-                // Limit the page size to something small so we do more than one page worth of work, so we get more status updates.
-                .put("page_size", pageSize())
-                // Report the status after every action
-                .put("status_interval", "0ms")
-                .build()
-        );
-        return EsqlQueryRequestBuilder.newSyncEsqlQueryRequestBuilder(client())
-            .query("from test | stats sum(pause_me)")
-            .pragmas(pragmas)
-            .execute();
+        var settingsBuilder = Settings.builder()
+            // Force shard partitioning because that's all the tests know how to match. It is easier to reason about too.
+            .put("data_partitioning", "shard")
+            // Limit the page size to something small so we do more than one page worth of work, so we get more status updates.
+            .put("page_size", pageSize())
+            // Report the status after every action
+            .put("status_interval", "0ms");
+
+        if (nodeLevelReduction == false) {
+            // explicitly set the default (false) or don't
+            if (randomBoolean()) {
+                settingsBuilder.put("node_level_reduction", nodeLevelReduction);
+            }
+        } else {
+            settingsBuilder.put("node_level_reduction", nodeLevelReduction);
+        }
+
+        var pragmas = new QueryPragmas(settingsBuilder.build());
+        return EsqlQueryRequestBuilder.newSyncEsqlQueryRequestBuilder(client()).query(query).pragmas(pragmas).execute();
     }
 
     private void cancelTask(TaskId taskId) {
@@ -407,6 +418,67 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
         }
     }
 
+    public void testTaskContentsForTopNQuery() throws Exception {
+        READ_DESCRIPTION = ("\\_LuceneTopNSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = 1000, "
+            + "sorts = [{\"pause_me\":{\"order\":\"asc\",\"missing\":\"_last\",\"unmapped_type\":\"long\"}}]]\n"
+            + "\\_ValuesSourceReaderOperator[fields = [pause_me]]\n"
+            + "\\_ProjectOperator[projection = [1]]\n"
+            + "\\_ExchangeSinkOperator").replace("pageSize()", Integer.toString(pageSize()));
+        MERGE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n"
+            + "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], "
+            + "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n"
+            + "\\_ProjectOperator[projection = [0]]\n"
+            + "\\_OutputOperator[columns = [pause_me]]";
+        REDUCE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n"
+            + (nodeLevelReduction
+                ? "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], "
+                    + "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n"
+                : "")
+            + "\\_ExchangeSinkOperator";
+
+        ActionFuture<EsqlQueryResponse> response = startEsql("from test | sort pause_me | keep pause_me");
+        try {
+            getTasksStarting();
+            scriptPermits.release(pageSize());
+            getTasksRunning();
+        } finally {
+            // each scripted field "emit" is called by LuceneTopNSourceOperator and by ValuesSourceReaderOperator
+            scriptPermits.release(2 * numberOfDocs());
+            try (EsqlQueryResponse esqlResponse = response.get()) {
+                assertThat(Iterators.flatMap(esqlResponse.values(), i -> i).next(), equalTo(1L));
+            }
+        }
+    }
+
+    public void testTaskContentsForLimitQuery() throws Exception {
+        String limit = Integer.toString(randomIntBetween(pageSize() + 1, 2 * numberOfDocs()));
+        READ_DESCRIPTION = """
+            \\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = limit()]
+            \\_ValuesSourceReaderOperator[fields = [pause_me]]
+            \\_ProjectOperator[projection = [1]]
+            \\_ExchangeSinkOperator""".replace("pageSize()", Integer.toString(pageSize())).replace("limit()", limit);
+        MERGE_DESCRIPTION = """
+            \\_ExchangeSourceOperator[]
+            \\_LimitOperator[limit = limit()]
+            \\_ProjectOperator[projection = [0]]
+            \\_OutputOperator[columns = [pause_me]]""".replace("limit()", limit);
+        REDUCE_DESCRIPTION = ("\\_ExchangeSourceOperator[]\n"
+            + (nodeLevelReduction ? "\\_LimitOperator[limit = limit()]\n" : "")
+            + "\\_ExchangeSinkOperator").replace("limit()", limit);
+
+        ActionFuture<EsqlQueryResponse> response = startEsql("from test | keep pause_me | limit " + limit);
+        try {
+            getTasksStarting();
+            scriptPermits.release(pageSize());
+            getTasksRunning();
+        } finally {
+            scriptPermits.release(numberOfDocs());
+            try (EsqlQueryResponse esqlResponse = response.get()) {
+                assertThat(Iterators.flatMap(esqlResponse.values(), i -> i).next(), equalTo(1L));
+            }
+        }
+    }
+
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins() {
         return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);

+ 5 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java

@@ -629,7 +629,8 @@ public final class PlanNamedTypes {
             in.readSource(),
             in.readLogicalPlanNode(),
             in.readOptionalNamedWriteable(QueryBuilder.class),
-            in.readOptionalVInt()
+            in.readOptionalVInt(),
+            in.getTransportVersion().onOrAfter(TransportVersions.ESQL_REDUCER_NODE_FRAGMENT) ? in.readOptionalPhysicalPlanNode() : null
         );
     }
 
@@ -638,6 +639,9 @@ public final class PlanNamedTypes {
         out.writeLogicalPlanNode(fragmentExec.fragment());
         out.writeOptionalNamedWriteable(fragmentExec.esFilter());
         out.writeOptionalVInt(fragmentExec.estimatedRowSize());
+        if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_REDUCER_NODE_FRAGMENT)) {
+            out.writeOptionalPhysicalPlanNode(fragmentExec.reducer());
+        }
     }
 
     static GrokExec readGrokExec(PlanStreamInput in) throws IOException {

+ 4 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java

@@ -104,6 +104,10 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput {
         return readNamed(PhysicalPlan.class);
     }
 
+    public PhysicalPlan readOptionalPhysicalPlanNode() throws IOException {
+        return readOptionalNamed(PhysicalPlan.class);
+    }
+
     public Source readSource() throws IOException {
         boolean hasSource = readBoolean();
         return hasSource ? readSourceWithText(this, configuration.query()) : Source.EMPTY;

+ 9 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java

@@ -53,6 +53,15 @@ public final class PlanStreamOutput extends StreamOutput {
         writeNamed(PhysicalPlan.class, physicalPlan);
     }
 
+    public void writeOptionalPhysicalPlanNode(PhysicalPlan physicalPlan) throws IOException {
+        if (physicalPlan == null) {
+            writeBoolean(false);
+        } else {
+            writeBoolean(true);
+            writePhysicalPlanNode(physicalPlan);
+        }
+    }
+
     public void writeSource(Source source) throws IOException {
         writeBoolean(true);
         writeSourceNoText(this, source);

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

@@ -151,7 +151,8 @@ public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor<PhysicalPla
                                     Source.EMPTY,
                                     new Project(logicalFragment.source(), logicalFragment, output),
                                     fragmentExec.esFilter(),
-                                    fragmentExec.estimatedRowSize()
+                                    fragmentExec.estimatedRowSize(),
+                                    fragmentExec.reducer()
                                 )
                             );
                         }

+ 24 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExec.java

@@ -20,6 +20,7 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
 
     private final LogicalPlan fragment;
     private final QueryBuilder esFilter;
+    private final PhysicalPlan reducer; // datanode-level physical plan node that performs an intermediate (not partial) reduce
 
     /**
      * Estimate of the number of bytes that'll be loaded per position before
@@ -28,14 +29,15 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
     private final int estimatedRowSize;
 
     public FragmentExec(LogicalPlan fragment) {
-        this(fragment.source(), fragment, null, 0);
+        this(fragment.source(), fragment, null, 0, null);
     }
 
-    public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize) {
+    public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize, PhysicalPlan reducer) {
         super(source);
         this.fragment = fragment;
         this.esFilter = esFilter;
         this.estimatedRowSize = estimatedRowSize;
+        this.reducer = reducer;
     }
 
     public LogicalPlan fragment() {
@@ -50,9 +52,13 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
         return estimatedRowSize;
     }
 
+    public PhysicalPlan reducer() {
+        return reducer;
+    }
+
     @Override
     protected NodeInfo<FragmentExec> info() {
-        return NodeInfo.create(this, FragmentExec::new, fragment, esFilter, estimatedRowSize);
+        return NodeInfo.create(this, FragmentExec::new, fragment, esFilter, estimatedRowSize, reducer);
     }
 
     @Override
@@ -65,12 +71,20 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
         int estimatedRowSize = state.consumeAllFields(false);
         return Objects.equals(estimatedRowSize, this.estimatedRowSize)
             ? this
-            : new FragmentExec(source(), fragment, esFilter, estimatedRowSize);
+            : new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
+    }
+
+    public FragmentExec withFilter(QueryBuilder filter) {
+        return Objects.equals(filter, this.esFilter) ? this : new FragmentExec(source(), fragment, filter, estimatedRowSize, reducer);
+    }
+
+    public FragmentExec withReducer(PhysicalPlan reducer) {
+        return Objects.equals(reducer, this.reducer) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(fragment, esFilter, estimatedRowSize);
+        return Objects.hash(fragment, esFilter, estimatedRowSize, reducer);
     }
 
     @Override
@@ -86,7 +100,8 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
         FragmentExec other = (FragmentExec) obj;
         return Objects.equals(fragment, other.fragment)
             && Objects.equals(esFilter, other.esFilter)
-            && Objects.equals(estimatedRowSize, other.estimatedRowSize);
+            && Objects.equals(estimatedRowSize, other.estimatedRowSize)
+            && Objects.equals(reducer, other.reducer);
     }
 
     @Override
@@ -97,7 +112,9 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
         sb.append(esFilter);
         sb.append(", estimatedRowSize=");
         sb.append(estimatedRowSize);
-        sb.append(", fragment=[<>\n");
+        sb.append(", reducer=[");
+        sb.append(reducer == null ? "" : reducer.toString());
+        sb.append("], fragment=[<>\n");
         sb.append(fragment.toString());
         sb.append("<>]]");
         return sb.toString();

+ 5 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

@@ -149,7 +149,7 @@ public class LocalExecutionPlanner {
     /**
      * turn the given plan into a list of drivers to execute
      */
-    public LocalExecutionPlan plan(PhysicalPlan node) {
+    public LocalExecutionPlan plan(PhysicalPlan localPhysicalPlan) {
         var context = new LocalExecutionPlannerContext(
             new ArrayList<>(),
             new Holder<>(DriverParallelism.SINGLE),
@@ -160,11 +160,11 @@ public class LocalExecutionPlanner {
         );
 
         // workaround for https://github.com/elastic/elasticsearch/issues/99782
-        node = node.transformUp(
+        localPhysicalPlan = localPhysicalPlan.transformUp(
             AggregateExec.class,
             a -> a.getMode() == AggregateExec.Mode.FINAL ? new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates())) : a
         );
-        PhysicalOperation physicalOperation = plan(node, context);
+        PhysicalOperation physicalOperation = plan(localPhysicalPlan, context);
 
         final TimeValue statusInterval = configuration.pragmas().statusInterval();
         context.addDriverFactory(
@@ -181,7 +181,7 @@ public class LocalExecutionPlanner {
         if (node instanceof AggregateExec aggregate) {
             return planAggregation(aggregate, context);
         } else if (node instanceof FieldExtractExec fieldExtractExec) {
-            return planFieldExtractNode(context, fieldExtractExec);
+            return planFieldExtractNode(fieldExtractExec, context);
         } else if (node instanceof ExchangeExec exchangeExec) {
             return planExchange(exchangeExec, context);
         } else if (node instanceof TopNExec topNExec) {
@@ -259,7 +259,7 @@ public class LocalExecutionPlanner {
         return PhysicalOperation.fromSource(luceneFactory, layout.build());
     }
 
-    private PhysicalOperation planFieldExtractNode(LocalExecutionPlannerContext context, FieldExtractExec fieldExtractExec) {
+    private PhysicalOperation planFieldExtractNode(FieldExtractExec fieldExtractExec, LocalExecutionPlannerContext context) {
         return physicalOperationProviders.fieldExtractPhysicalOperation(fieldExtractExec, plan(fieldExtractExec.child(), context));
     }
 

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java

@@ -116,7 +116,7 @@ public class Mapper {
         throw new EsqlIllegalArgumentException("unsupported logical plan node [" + p.nodeName() + "]");
     }
 
-    private static boolean isPipelineBreaker(LogicalPlan p) {
+    static boolean isPipelineBreaker(LogicalPlan p) {
         return p instanceof Aggregate || p instanceof TopN || p instanceof Limit || p instanceof OrderBy;
     }
 

+ 38 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

@@ -21,6 +21,7 @@ import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
 import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer;
 import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
 import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer;
+import org.elasticsearch.xpack.esql.plan.logical.TopN;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
 import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
@@ -28,7 +29,10 @@ import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
 import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
+import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
+import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
 import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
 import org.elasticsearch.xpack.esql.stats.SearchStats;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
@@ -37,8 +41,13 @@ import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.FieldAttribute;
 import org.elasticsearch.xpack.ql.expression.predicate.Predicates;
 import org.elasticsearch.xpack.ql.options.EsSourceOptions;
+import org.elasticsearch.xpack.ql.plan.logical.Aggregate;
 import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
 import org.elasticsearch.xpack.ql.plan.logical.Filter;
+import org.elasticsearch.xpack.ql.plan.logical.Limit;
+import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
+import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan;
 import org.elasticsearch.xpack.ql.tree.Source;
 import org.elasticsearch.xpack.ql.type.DataType;
 import org.elasticsearch.xpack.ql.type.DataTypes;
@@ -73,6 +82,35 @@ public class PlannerUtils {
         return new Tuple<>(coordinatorPlan, dataNodePlan.get());
     }
 
+    public static PhysicalPlan dataNodeReductionPlan(LogicalPlan plan, PhysicalPlan unused) {
+        var pipelineBreakers = plan.collectFirstChildren(Mapper::isPipelineBreaker);
+
+        if (pipelineBreakers.isEmpty() == false) {
+            UnaryPlan pipelineBreaker = (UnaryPlan) pipelineBreakers.get(0);
+            if (pipelineBreaker instanceof TopN topN) {
+                return new TopNExec(topN.source(), unused, topN.order(), topN.limit(), 2000);
+            } else if (pipelineBreaker instanceof Limit limit) {
+                return new LimitExec(limit.source(), unused, limit.limit());
+            } else if (pipelineBreaker instanceof OrderBy order) {
+                return new OrderExec(order.source(), unused, order.order());
+            } else if (pipelineBreaker instanceof Aggregate aggregate) {
+                // TODO handle this as a special PARTIAL step (intermediate)
+                /*return new AggregateExec(
+                    aggregate.source(),
+                    unused,
+                    aggregate.groupings(),
+                    aggregate.aggregates(),
+                    AggregateExec.Mode.PARTIAL,
+                    0
+                );*/
+                return null;
+            } else {
+                throw new EsqlIllegalArgumentException("unsupported unary physical plan node [" + pipelineBreaker.nodeName() + "]");
+            }
+        }
+        return null;
+    }
+
     /**
      * Returns a set of concrete indices after resolving the original indices specified in the FROM command.
      */

+ 23 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -64,6 +64,7 @@ import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
 import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
+import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
 import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
@@ -289,12 +290,19 @@ public class ComputeService {
         ActionListener<Void> parentListener,
         Supplier<ActionListener<ComputeResponse>> dataNodeListenerSupplier
     ) {
+        var planWithReducer = configuration.pragmas().nodeLevelReduction() == false
+            ? dataNodePlan
+            : dataNodePlan.transformUp(FragmentExec.class, f -> {
+                PhysicalPlan reductionNode = PlannerUtils.dataNodeReductionPlan(f.fragment(), dataNodePlan);
+                return reductionNode == null ? f : f.withReducer(reductionNode);
+            });
+
         // The lambda is to say if a TEXT field has an identical exact subfield
         // We cannot use SearchContext because we don't have it yet.
         // Since it's used only for @timestamp, it is relatively safe to assume it's not needed
         // but it would be better to have a proper impl.
-        QueryBuilder requestFilter = PlannerUtils.requestFilter(dataNodePlan, x -> true);
-        EsSourceOptions esSourceOptions = PlannerUtils.esSourceOptions(dataNodePlan);
+        QueryBuilder requestFilter = PlannerUtils.requestFilter(planWithReducer, x -> true);
+        EsSourceOptions esSourceOptions = PlannerUtils.esSourceOptions(planWithReducer);
         lookupDataNodes(
             parentTask,
             clusterAlias,
@@ -327,7 +335,7 @@ public class ComputeService {
                                         clusterAlias,
                                         node.shardIds,
                                         node.aliasFilters,
-                                        dataNodePlan
+                                        planWithReducer
                                     ),
                                     parentTask,
                                     TransportRequestOptions.EMPTY,
@@ -426,6 +434,9 @@ public class ComputeService {
 
             LOGGER.debug("Received physical plan:\n{}", plan);
             plan = PlannerUtils.localPlan(context.searchContexts, context.configuration, plan);
+            // the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
+            // it's doing this in the planning of EsQueryExec (the source of the data)
+            // see also EsPhysicalOperationProviders.sourcePhysicalOperation
             LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(plan);
 
             if (LOGGER.isDebugEnabled()) {
@@ -750,11 +761,19 @@ public class ComputeService {
             final ActionListener<ComputeResponse> listener = new ChannelActionListener<>(channel);
             final ExchangeSinkExec reducePlan;
             if (request.plan() instanceof ExchangeSinkExec plan) {
+                var fragments = plan.collectFirstChildren(FragmentExec.class::isInstance);
+                if (fragments.isEmpty()) {
+                    listener.onFailure(new IllegalStateException("expected a fragment plan for a remote compute; got " + request.plan()));
+                    return;
+                }
+
+                var localExchangeSource = new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg());
+                FragmentExec fragment = (FragmentExec) fragments.get(0);
                 reducePlan = new ExchangeSinkExec(
                     plan.source(),
                     plan.output(),
                     plan.isIntermediateAgg(),
-                    new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg())
+                    fragment.reducer() != null ? fragment.reducer().replaceChildren(List.of(localExchangeSource)) : localExchangeSource
                 );
             } else {
                 listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + request.plan()));

+ 10 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

@@ -57,6 +57,8 @@ public final class QueryPragmas implements Writeable {
 
     public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
 
+    public static final Setting<Boolean> NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", false);
+
     public static final QueryPragmas EMPTY = new QueryPragmas(Settings.EMPTY);
 
     private final Settings settings;
@@ -126,6 +128,14 @@ public final class QueryPragmas implements Writeable {
         return MAX_CONCURRENT_SHARDS_PER_NODE.get(settings);
     }
 
+    /**
+     * Returns true if each data node should perform a local reduction for sort, limit, topN, stats or false if the coordinator node
+     * will perform the reduction.
+     */
+    public boolean nodeLevelReduction() {
+        return NODE_LEVEL_REDUCTION.get(settings);
+    }
+
     public boolean isEmpty() {
         return settings.isEmpty();
     }

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -133,7 +133,7 @@ public class EsqlSession {
                     // TODO: filter integration testing
                     filter = fragmentFilter != null ? boolQuery().filter(fragmentFilter).must(filter) : filter;
                     LOGGER.debug("Fold filter {} to EsQueryExec", filter);
-                    f = new FragmentExec(f.source(), f.fragment(), filter, f.estimatedRowSize());
+                    f = f.withFilter(filter);
                 }
                 return f;
             })))

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java

@@ -293,7 +293,7 @@ public class FilterTests extends ESTestCase {
         // System.out.println("physical\n" + physical);
         physical = physical.transformUp(
             FragmentExec.class,
-            f -> new FragmentExec(f.source(), f.fragment(), restFilter, f.estimatedRowSize())
+            f -> new FragmentExec(f.source(), f.fragment(), restFilter, f.estimatedRowSize(), f.reducer())
         );
         physical = physicalPlanOptimizer.optimize(physical);
         // System.out.println("optimized\n" + physical);