浏览代码

Infer layout between plans instead of storing it (ESQL-1519)

The big change in this PR is exposing the aggregation intermediate state
 in the plan. Thus the output of the data node plan and input of the
 coordinator plan end up the same and thus the layout can be inferred
 without having to be memorized (or determined from the subplan).
To make that happen, the page traveling between plans must have the same
order and thus are aligned as there's no map to hold any out of order
information.

Furthermore in case of aggregations, since the NameIds are not the same
 across plans, a 'decorating' layout is used that looks at the call
 order instead of the actual arguments passed to it. This clearly needs
 improving.
Costin Leau 2 年之前
父节点
当前提交
cc0d8b0679
共有 17 个文件被更改,包括 260 次插入83 次删除
  1. 15 4
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java
  2. 2 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java
  3. 2 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java
  4. 4 2
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/version.csv-spec
  5. 7 4
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java
  6. 25 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeExec.java
  7. 14 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExec.java
  8. 8 8
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSourceExec.java
  9. 56 4
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java
  10. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java
  11. 65 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/ExchangeLayout.java
  12. 6 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java
  13. 35 28
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
  14. 8 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java
  15. 2 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
  16. 8 16
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java
  17. 2 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java

+ 15 - 4
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java

@@ -20,6 +20,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 /**
@@ -28,12 +29,20 @@ import java.util.function.Supplier;
 public class ExchangeSinkOperator extends SinkOperator {
 
     private final ExchangeSink sink;
+    private final Function<Page, Page> transformer;
     private int pagesAccepted;
 
-    public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks) implements SinkOperatorFactory {
+    public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks, Function<Page, Page> transformer)
+        implements
+            SinkOperatorFactory {
+
+        public ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks) {
+            this(exchangeSinks, Function.identity());
+        }
+
         @Override
         public SinkOperator get(DriverContext driverContext) {
-            return new ExchangeSinkOperator(exchangeSinks.get());
+            return new ExchangeSinkOperator(exchangeSinks.get(), transformer);
         }
 
         @Override
@@ -42,8 +51,9 @@ public class ExchangeSinkOperator extends SinkOperator {
         }
     }
 
-    public ExchangeSinkOperator(ExchangeSink sink) {
+    public ExchangeSinkOperator(ExchangeSink sink, Function<Page, Page> transformer) {
         this.sink = sink;
+        this.transformer = transformer;
     }
 
     @Override
@@ -69,7 +79,8 @@ public class ExchangeSinkOperator extends SinkOperator {
     @Override
     public void addInput(Page page) {
         pagesAccepted++;
-        sink.addPage(page);
+        var newPage = transformer.apply(page);
+        sink.addPage(newPage);
     }
 
     @Override

+ 2 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java

@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -227,7 +228,7 @@ public abstract class ForkingOperatorTestCase extends OperatorTestCase {
                         simpleWithMode(bigArrays, AggregatorMode.INTERMEDIATE).get(driver1Context),
                         intermediateOperatorItr.next()
                     ),
-                    new ExchangeSinkOperator(sinkExchanger.createExchangeSink()),
+                    new ExchangeSinkOperator(sinkExchanger.createExchangeSink(), Function.identity()),
                     () -> {}
                 )
             );

+ 2 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

@@ -50,6 +50,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -262,7 +263,7 @@ public class ExchangeServiceTests extends ESTestCase {
         List<Driver> drivers = new ArrayList<>(numSinks + numSources);
         for (int i = 0; i < numSinks; i++) {
             String description = "sink-" + i;
-            ExchangeSinkOperator sinkOperator = new ExchangeSinkOperator(exchangeSink.get());
+            ExchangeSinkOperator sinkOperator = new ExchangeSinkOperator(exchangeSink.get(), Function.identity());
             DriverContext dc = new DriverContext();
             Driver d = new Driver("test-session:1", dc, () -> description, seqNoGenerator.get(dc), List.of(), sinkOperator, () -> {});
             drivers.add(d);

+ 4 - 2
x-pack/plugin/esql/qa/testFixtures/src/main/resources/version.csv-spec

@@ -230,7 +230,8 @@ v:version
 // end::to_version-result[]
 ;
 
-castConstantToVersion2
+// AwaitFix: #1521 better plan queries that return only constants
+castConstantToVersion2-Ignore
 FROM apps | EVAL v = TO_VERSION("1.2.3") | KEEP v;
 
 v:v
@@ -250,7 +251,8 @@ v:v
 1.2.3
 ;
 
-multipleCast
+// AwaitFix: #1521 better plan queries that return only constants
+multipleCast-Ignore
 FROM apps | EVAL v = TO_STR(TO_VER("1.2.3")) | KEEP v;
 
 v:s

+ 7 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java

@@ -452,28 +452,31 @@ public final class PlanNamedTypes {
     }
 
     static ExchangeExec readExchangeExec(PlanStreamInput in) throws IOException {
-        return new ExchangeExec(Source.EMPTY, in.readPhysicalPlanNode());
+        return new ExchangeExec(Source.EMPTY, readAttributes(in), in.readBoolean(), in.readPhysicalPlanNode());
     }
 
     static void writeExchangeExec(PlanStreamOutput out, ExchangeExec exchangeExec) throws IOException {
+        writeAttributes(out, exchangeExec.output());
+        out.writeBoolean(exchangeExec.isInBetweenAggs());
         out.writePhysicalPlanNode(exchangeExec.child());
     }
 
     static ExchangeSinkExec readExchangeSinkExec(PlanStreamInput in) throws IOException {
-        return new ExchangeSinkExec(Source.EMPTY, in.readPhysicalPlanNode());
+        return new ExchangeSinkExec(Source.EMPTY, readAttributes(in), in.readPhysicalPlanNode());
     }
 
     static void writeExchangeSinkExec(PlanStreamOutput out, ExchangeSinkExec exchangeSinkExec) throws IOException {
+        writeAttributes(out, exchangeSinkExec.output());
         out.writePhysicalPlanNode(exchangeSinkExec.child());
     }
 
     static ExchangeSourceExec readExchangeSourceExec(PlanStreamInput in) throws IOException {
-        return new ExchangeSourceExec(Source.EMPTY, readAttributes(in), in.readPhysicalPlanNode());
+        return new ExchangeSourceExec(Source.EMPTY, readAttributes(in), in.readBoolean());
     }
 
     static void writeExchangeSourceExec(PlanStreamOutput out, ExchangeSourceExec exchangeSourceExec) throws IOException {
         writeAttributes(out, exchangeSourceExec.output());
-        out.writePhysicalPlanNode(exchangeSourceExec.nodeLayout());
+        out.writeBoolean(exchangeSourceExec.isIntermediateAgg());
     }
 
     static FieldExtractExec readFieldExtractExec(PlanStreamInput in) throws IOException {

+ 25 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeExec.java

@@ -7,22 +7,45 @@
 
 package org.elasticsearch.xpack.esql.plan.physical;
 
+import org.elasticsearch.xpack.ql.expression.Attribute;
 import org.elasticsearch.xpack.ql.tree.NodeInfo;
 import org.elasticsearch.xpack.ql.tree.Source;
 
+import java.util.List;
+
+import static java.util.Collections.emptyList;
+
 public class ExchangeExec extends UnaryExec {
 
+    private final List<Attribute> output;
+    private final boolean inBetweenAggs;
+
     public ExchangeExec(Source source, PhysicalPlan child) {
+        this(source, emptyList(), false, child);
+    }
+
+    public ExchangeExec(Source source, List<Attribute> output, boolean inBetweenAggs, PhysicalPlan child) {
         super(source, child);
+        this.output = output;
+        this.inBetweenAggs = inBetweenAggs;
+    }
+
+    @Override
+    public List<Attribute> output() {
+        return output.isEmpty() ? super.output() : output;
+    }
+
+    public boolean isInBetweenAggs() {
+        return inBetweenAggs;
     }
 
     @Override
     public UnaryExec replaceChild(PhysicalPlan newChild) {
-        return new ExchangeExec(source(), newChild);
+        return new ExchangeExec(source(), output, inBetweenAggs, newChild);
     }
 
     @Override
     protected NodeInfo<? extends PhysicalPlan> info() {
-        return NodeInfo.create(this, ExchangeExec::new, child());
+        return NodeInfo.create(this, ExchangeExec::new, output, inBetweenAggs, child());
     }
 }

+ 14 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExec.java

@@ -7,22 +7,33 @@
 
 package org.elasticsearch.xpack.esql.plan.physical;
 
+import org.elasticsearch.xpack.ql.expression.Attribute;
 import org.elasticsearch.xpack.ql.tree.NodeInfo;
 import org.elasticsearch.xpack.ql.tree.Source;
 
+import java.util.List;
+
 public class ExchangeSinkExec extends UnaryExec {
 
-    public ExchangeSinkExec(Source source, PhysicalPlan child) {
+    private final List<Attribute> output;
+
+    public ExchangeSinkExec(Source source, List<Attribute> output, PhysicalPlan child) {
         super(source, child);
+        this.output = output;
+    }
+
+    @Override
+    public List<Attribute> output() {
+        return output;
     }
 
     @Override
     protected NodeInfo<? extends ExchangeSinkExec> info() {
-        return NodeInfo.create(this, ExchangeSinkExec::new, child());
+        return NodeInfo.create(this, ExchangeSinkExec::new, output, child());
     }
 
     @Override
     public ExchangeSinkExec replaceChild(PhysicalPlan newChild) {
-        return new ExchangeSinkExec(source(), newChild);
+        return new ExchangeSinkExec(source(), output, newChild);
     }
 }

+ 8 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSourceExec.java

@@ -17,12 +17,12 @@ import java.util.Objects;
 public class ExchangeSourceExec extends LeafExec {
 
     private final List<Attribute> output;
-    private final PhysicalPlan planUsedForLayout;
+    private final boolean intermediateAgg;
 
-    public ExchangeSourceExec(Source source, List<Attribute> output, PhysicalPlan fragmentPlanUsedForLayout) {
+    public ExchangeSourceExec(Source source, List<Attribute> output, boolean intermediateAgg) {
         super(source);
         this.output = output;
-        this.planUsedForLayout = fragmentPlanUsedForLayout;
+        this.intermediateAgg = intermediateAgg;
     }
 
     @Override
@@ -30,13 +30,13 @@ public class ExchangeSourceExec extends LeafExec {
         return output;
     }
 
-    public PhysicalPlan nodeLayout() {
-        return planUsedForLayout;
+    public boolean isIntermediateAgg() {
+        return intermediateAgg;
     }
 
     @Override
     protected NodeInfo<ExchangeSourceExec> info() {
-        return NodeInfo.create(this, ExchangeSourceExec::new, output, planUsedForLayout);
+        return NodeInfo.create(this, ExchangeSourceExec::new, output, intermediateAgg);
     }
 
     @Override
@@ -44,11 +44,11 @@ public class ExchangeSourceExec extends LeafExec {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         ExchangeSourceExec that = (ExchangeSourceExec) o;
-        return Objects.equals(output, that.output) && Objects.equals(planUsedForLayout, that.planUsedForLayout);
+        return Objects.equals(output, that.output) && intermediateAgg == that.intermediateAgg;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(output, planUsedForLayout);
+        return Objects.hash(output, intermediateAgg);
     }
 }

+ 56 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

@@ -50,6 +50,8 @@ abstract class AbstractPhysicalOperationProviders implements PhysicalOperationPr
         AggregateExec.Mode mode = aggregateExec.getMode();
         var aggregates = aggregateExec.aggregates();
 
+        var sourceLayout = source.layout;
+
         if (aggregateExec.groupings().isEmpty()) {
             // not grouping
             List<Aggregator.Factory> aggregatorFactories = new ArrayList<>();
@@ -64,7 +66,7 @@ abstract class AbstractPhysicalOperationProviders implements PhysicalOperationPr
             aggregatesToFactory(
                 aggregates,
                 mode,
-                source,
+                sourceLayout,
                 context.bigArrays(),
                 false, // non-grouping
                 s -> aggregatorFactories.add(s.supplier.aggregatorFactory(s.mode))
@@ -133,7 +135,7 @@ abstract class AbstractPhysicalOperationProviders implements PhysicalOperationPr
             aggregatesToFactory(
                 aggregates,
                 mode,
-                source,
+                sourceLayout,
                 context.bigArrays(),
                 true, // grouping
                 s -> aggregatorFactories.add(s.supplier.groupingAggregatorFactory(s.mode))
@@ -163,12 +165,62 @@ abstract class AbstractPhysicalOperationProviders implements PhysicalOperationPr
         throw new UnsupportedOperationException();
     }
 
+    /***
+     * Creates a standard layout for intermediate aggregations, typically used across exchanges.
+     * Puts the group first, followed by each aggregation.
+     *
+     * It's similar to the code above (groupingPhysicalOperation) but ignores the factory creation.
+     */
+    public static List<Attribute> intermediateAttributes(List<? extends NamedExpression> aggregates, List<? extends Expression> groupings) {
+        var aggregateMapper = new AggregateMapper();
+
+        List<Attribute> attrs = new ArrayList<>();
+
+        // no groups
+        if (groupings.isEmpty()) {
+            attrs = Expressions.asAttributes(aggregateMapper.mapNonGrouping(aggregates));
+        }
+        // groups
+        else {
+            for (Expression group : groupings) {
+                var groupAttribute = Expressions.attribute(group);
+                if (groupAttribute == null) {
+                    throw new EsqlIllegalArgumentException("Unexpected non-named expression[{}] as grouping", group);
+                }
+                Set<NameId> grpAttribIds = new HashSet<>();
+                grpAttribIds.add(groupAttribute.id());
+
+                /*
+                 * Check for aliasing in aggregates which occurs in two cases (due to combining project + stats):
+                 *  - before stats (keep x = a | stats by x) which requires the partial input to use a's channel
+                 *  - after  stats (stats by a | keep x = a) which causes the output layout to refer to the follow-up alias
+                 */
+                for (NamedExpression agg : aggregates) {
+                    if (agg instanceof Alias a) {
+                        if (a.child() instanceof Attribute attr) {
+                            if (groupAttribute.id().equals(attr.id())) {
+                                grpAttribIds.add(a.id());
+                                // TODO: investigate whether a break could be used since it shouldn't be possible to have multiple
+                                // attributes
+                                // pointing to the same attribute
+                            }
+                        }
+                    }
+                }
+                attrs.add(groupAttribute);
+            }
+
+            attrs.addAll(Expressions.asAttributes(aggregateMapper.mapGrouping(aggregates)));
+        }
+        return attrs;
+    }
+
     private record AggFunctionSupplierContext(AggregatorFunctionSupplier supplier, AggregatorMode mode) {}
 
     private void aggregatesToFactory(
         List<? extends NamedExpression> aggregates,
         AggregateExec.Mode mode,
-        PhysicalOperation source,
+        Layout layout,
         BigArrays bigArrays,
         boolean grouping,
         Consumer<AggFunctionSupplierContext> consumer
@@ -200,7 +252,7 @@ abstract class AbstractPhysicalOperationProviders implements PhysicalOperationPr
                         params[i] = aggParams.get(i).fold();
                     }
 
-                    List<Integer> inputChannels = sourceAttr.stream().map(NamedExpression::id).map(source.layout::getChannel).toList();
+                    List<Integer> inputChannels = sourceAttr.stream().map(NamedExpression::id).map(layout::getChannel).toList();
                     assert inputChannels != null && inputChannels.size() > 0 && inputChannels.stream().allMatch(i -> i >= 0);
                     if (aggregateFunction instanceof ToAggregator agg) {
                         consumer.accept(new AggFunctionSupplierContext(agg.supplier(bigArrays, inputChannels), aggMode));

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java

@@ -195,7 +195,7 @@ public class AggregateMapper {
     static DataType toDataType(ElementType elementType) {
         return switch (elementType) {
             case BOOLEAN -> DataTypes.BOOLEAN;
-            case BYTES_REF -> DataTypes.BINARY;
+            case BYTES_REF -> DataTypes.KEYWORD;
             case INT -> DataTypes.INTEGER;
             case LONG -> DataTypes.LONG;
             case DOUBLE -> DataTypes.DOUBLE;

+ 65 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/ExchangeLayout.java

@@ -0,0 +1,65 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.planner;
+
+import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.xpack.ql.expression.NameId;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.Collections.emptyMap;
+
+/**
+ * Decorating layout that creates the NameId -> Value lazily based on the calls made to its content.
+ * Essentially it maps the existing (old) NameIds to the new ones.
+ */
+class ExchangeLayout extends Layout {
+
+    private final Map<NameId, Integer> delegate;
+    private final Map<Integer, Set<NameId>> inverse;
+    private final Map<NameId, NameId> mappingToOldLayout;
+    private int counter;
+
+    ExchangeLayout(Layout layout) {
+        super(emptyMap(), 0);
+        this.delegate = layout.internalLayout();
+        this.mappingToOldLayout = Maps.newMapWithExpectedSize(delegate.size());
+        this.inverse = Maps.newMapWithExpectedSize(delegate.size());
+
+        for (Map.Entry<NameId, Integer> entry : delegate.entrySet()) {
+            NameId key = entry.getKey();
+            Integer value = entry.getValue();
+            inverse.computeIfAbsent(value, k -> new HashSet<>()).add(key);
+        }
+    }
+
+    @Override
+    public Integer getChannel(NameId id) {
+        var oldId = mappingToOldLayout.get(id);
+        if (oldId == null && counter < delegate.size()) {
+            var names = inverse.get(counter++);
+            for (var name : names) {
+                oldId = name;
+                mappingToOldLayout.put(id, oldId);
+            }
+        }
+        return delegate.get(oldId);
+    }
+
+    @Override
+    public int numberOfIds() {
+        return delegate.size();
+    }
+
+    @Override
+    public int numberOfChannels() {
+        return inverse.size();
+    }
+}

+ 6 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java

@@ -31,7 +31,7 @@ public class Layout {
     private final Map<NameId, Integer> layout;
     private final int numberOfChannels;
 
-    private Layout(Map<NameId, Integer> layout, int numberOfChannels) {
+    Layout(Map<NameId, Integer> layout, int numberOfChannels) {
         this.layout = layout;
         this.numberOfChannels = numberOfChannels;
     }
@@ -58,6 +58,10 @@ public class Layout {
         return numberOfChannels;
     }
 
+    Map<NameId, Integer> internalLayout() {
+        return layout;
+    }
+
     /**
      * @return creates a builder to append to this layout.
      */
@@ -123,7 +127,7 @@ public class Layout {
             for (Set<NameId> ids : this.channels) {
                 int channel = numberOfChannels++;
                 for (NameId id : ids) {
-                    layout.put(id, channel);
+                    layout.putIfAbsent(id, channel);
                 }
             }
             return new Layout(Collections.unmodifiableMap(layout), numberOfChannels);

+ 35 - 28
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

@@ -79,7 +79,6 @@ import org.elasticsearch.xpack.ql.type.DataTypes;
 import org.elasticsearch.xpack.ql.util.Holder;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -204,7 +203,8 @@ public class LocalExecutionPlanner {
     }
 
     private PhysicalOperation planAggregation(AggregateExec aggregate, LocalExecutionPlannerContext context) {
-        return physicalOperationProviders.groupingPhysicalOperation(aggregate, plan(aggregate.child(), context), context);
+        var source = plan(aggregate.child(), context);
+        return physicalOperationProviders.groupingPhysicalOperation(aggregate, source, context);
     }
 
     private PhysicalOperation planEsQueryNode(EsQueryExec esQuery, LocalExecutionPlannerContext context) {
@@ -260,19 +260,28 @@ public class LocalExecutionPlanner {
         PhysicalOperation source = plan(outputExec.child(), context);
         var output = outputExec.output();
 
+        return source.withSink(
+            new OutputOperatorFactory(
+                Expressions.names(output),
+                alignPageToAttributes(output, source.layout),
+                outputExec.getPageConsumer()
+            ),
+            source.layout
+        );
+    }
+
+    private static Function<Page, Page> alignPageToAttributes(List<Attribute> attrs, Layout layout) {
         // align the page layout with the operator output
         // extraction order - the list ordinal is the same as the column one
         // while the value represents the position in the original page
-        final int[] mappedPosition = new int[output.size()];
+        final int[] mappedPosition = new int[attrs.size()];
         int index = -1;
         boolean transformRequired = false;
-        for (var attribute : output) {
-            mappedPosition[++index] = source.layout.getChannel(attribute.id());
-            if (transformRequired == false) {
-                transformRequired = mappedPosition[index] != index;
-            }
+        for (var attribute : attrs) {
+            mappedPosition[++index] = layout.getChannel(attribute.id());
+            transformRequired |= mappedPosition[index] != index;
         }
-        Function<Page, Page> mapper = transformRequired ? p -> {
+        Function<Page, Page> transformer = transformRequired ? p -> {
             var blocks = new Block[mappedPosition.length];
             for (int i = 0; i < blocks.length; i++) {
                 blocks[i] = p.getBlock(mappedPosition[i]);
@@ -280,7 +289,7 @@ public class LocalExecutionPlanner {
             return new Page(blocks);
         } : Function.identity();
 
-        return source.withSink(new OutputOperatorFactory(Expressions.names(output), mapper, outputExec.getPageConsumer()), source.layout);
+        return transformer;
     }
 
     private PhysicalOperation planExchange(ExchangeExec exchangeExec, LocalExecutionPlannerContext context) {
@@ -290,26 +299,26 @@ public class LocalExecutionPlanner {
     private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalExecutionPlannerContext context) {
         Objects.requireNonNull(exchangeSinkHandler, "ExchangeSinkHandler wasn't provided");
         PhysicalOperation source = plan(exchangeSink.child(), context);
-        return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkHandler::createExchangeSink), source.layout);
+
+        Function<Page, Page> transformer = exchangeSink.child() instanceof AggregateExec
+            ? Function.identity()
+            : alignPageToAttributes(exchangeSink.output(), source.layout);
+
+        return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkHandler::createExchangeSink, transformer), source.layout);
     }
 
     private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) {
-        // TODO: ugly hack for now to get the same layout - need to properly support it and have it exposed in the plan and over the wire
-        LocalExecutionPlannerContext dummyContext = new LocalExecutionPlannerContext(
-            new ArrayList<>(),
-            new Holder<>(DriverParallelism.SINGLE),
-            1,
-            DataPartitioning.SHARD,
-            1,
-            BigArrays.NON_RECYCLING_INSTANCE
-        );
-
-        var planToGetLayout = plan(exchangeSource.nodeLayout(), dummyContext);
         Objects.requireNonNull(exchangeSourceHandler, "ExchangeSourceHandler wasn't provided");
-        return PhysicalOperation.fromSource(
-            new ExchangeSourceOperatorFactory(exchangeSourceHandler::createExchangeSource),
-            planToGetLayout.layout
-        );
+
+        var builder = new Layout.Builder();
+        for (var attr : exchangeSource.output()) {
+            builder.appendChannel(attr.id());
+        }
+        // decorate the layout
+        var l = builder.build();
+        var layout = exchangeSource.isIntermediateAgg() ? new ExchangeLayout(l) : l;
+
+        return PhysicalOperation.fromSource(new ExchangeSourceOperatorFactory(exchangeSourceHandler::createExchangeSource), layout);
     }
 
     private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerContext context) {
@@ -374,8 +383,6 @@ public class LocalExecutionPlanner {
         }
         final Expression expr = dissect.inputExpression();
         String[] attributeNames = Expressions.names(dissect.extractedFields()).toArray(new String[0]);
-        ElementType[] types = new ElementType[dissect.extractedFields().size()];
-        Arrays.fill(types, ElementType.BYTES_REF);
 
         Layout layout = layoutBuilder.build();
         source = source.with(

+ 8 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java

@@ -184,10 +184,17 @@ public class Mapper {
         // TODO: might be easier long term to end up with just one node and split if necessary instead of doing that always at this stage
         else {
             child = addExchangeForFragment(aggregate, child);
+            // exchange was added - use the intermediates for the output
+            if (child instanceof ExchangeExec exchange) {
+                var output = AbstractPhysicalOperationProviders.intermediateAttributes(aggregate.aggregates(), aggregate.groupings());
+                child = new ExchangeExec(child.source(), output, true, exchange.child());
+            }
             // if no exchange was added, create the partial aggregate
-            if (child instanceof ExchangeExec == false) {
+            else {
                 child = aggExec(aggregate, child, PARTIAL);
             }
+
+            // regardless, always add the final agg
             child = aggExec(aggregate, child, FINAL);
         }
 

+ 2 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

@@ -42,12 +42,9 @@ public class PlannerUtils {
         PhysicalPlan coordinatorPlan = plan.transformUp(ExchangeExec.class, e -> {
             // remember the datanode subplan and wire it to a sink
             var subplan = e.child();
-            dataNodePlan.set(new ExchangeSinkExec(e.source(), subplan));
+            dataNodePlan.set(new ExchangeSinkExec(e.source(), e.output(), subplan));
 
-            // ugly hack to get the layout
-            var planContainingTheLayout = EstimatesRowSize.estimateRowSize(0, localPlan(List.of(), config, subplan));
-            // replace the subnode with an exchange source
-            return new ExchangeSourceExec(e.source(), e.output(), planContainingTheLayout);
+            return new ExchangeSourceExec(e.source(), e.output(), e.isInBetweenAggs());
         });
         return new Tuple<>(coordinatorPlan, dataNodePlan.get());
     }

+ 8 - 16
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -75,9 +75,7 @@ import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry;
 import org.elasticsearch.xpack.ql.index.EsIndex;
 import org.elasticsearch.xpack.ql.index.IndexResolution;
 import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.ql.util.DateUtils;
 import org.elasticsearch.xpack.ql.util.Holder;
-import org.elasticsearch.xpack.ql.util.StringUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.mockito.Mockito;
@@ -350,6 +348,11 @@ public class CsvTests extends ESTestCase {
         PhysicalPlan coordinatorPlan = coordinatorAndDataNodePlan.v1();
         PhysicalPlan dataNodePlan = coordinatorAndDataNodePlan.v2();
 
+        if (LOGGER.isTraceEnabled()) {
+            LOGGER.trace("Coordinator plan\n" + coordinatorPlan);
+            LOGGER.trace("DataNode plan\n" + dataNodePlan);
+        }
+
         List<String> columnNames = Expressions.names(coordinatorPlan.output());
         List<String> dataTypes = new ArrayList<>(columnNames.size());
         List<Type> columnTypes = coordinatorPlan.output()
@@ -384,6 +387,7 @@ public class CsvTests extends ESTestCase {
     // Clone of PlannerUtils
     //
 
+    // PlannerUtils#breakPlanBetweenCoordinatorAndDataNode
     private static Tuple<PhysicalPlan, PhysicalPlan> CSVbreakPlanBetweenCoordinatorAndDataNode(
         PhysicalPlan plan,
         LocalPhysicalPlanOptimizer optimizer
@@ -394,20 +398,8 @@ public class CsvTests extends ESTestCase {
         PhysicalPlan coordinatorPlan = plan.transformUp(ExchangeExec.class, e -> {
             // remember the datanode subplan and wire it to a sink
             var subplan = e.child();
-            dataNodePlan.set(new ExchangeSinkExec(e.source(), subplan));
-
-            // ugly hack to get the layout
-            var dummyConfig = new EsqlConfiguration(
-                DateUtils.UTC,
-                Locale.US,
-                StringUtils.EMPTY,
-                StringUtils.EMPTY,
-                QueryPragmas.EMPTY,
-                1000
-            );
-            var planContainingTheLayout = EstimatesRowSize.estimateRowSize(0, CSVlocalPlan(List.of(), dummyConfig, subplan, optimizer));
-            // replace the subnode with an exchange source
-            return new ExchangeSourceExec(e.source(), e.output(), planContainingTheLayout);
+            dataNodePlan.set(new ExchangeSinkExec(e.source(), e.output(), subplan));
+            return new ExchangeSourceExec(e.source(), e.output(), e.isInBetweenAggs());
         });
         return new Tuple<>(coordinatorPlan, dataNodePlan.get());
     }

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java

@@ -122,7 +122,8 @@ public class EsqlQueryRequestTests extends ESTestCase {
         TaskInfo taskInfo = task.taskInfo(localNode, true);
         String json = taskInfo.toString();
         String expected = Streams.readFully(getClass().getClassLoader().getResourceAsStream("query_task.json")).utf8ToString();
-        expected = expected.replaceAll("\s*<\\d+>", "")
+        expected = expected.replaceAll("\r\n", "\n")
+            .replaceAll("\s*<\\d+>", "")
             .replaceAll("FROM test \\| STATS MAX\\(d\\) by a, b", query)
             .replaceAll("5326", Integer.toString(id))
             .replaceAll("2j8UKw1bRO283PMwDugNNg", localNode)