Преглед изворни кода

Remove page alignment in exchange sink (#124610)

I see that planning the ExchangeSinkExec takes a few milliseconds when 
benchmarking simple queries with 10K fields. It spends time checking if
we need to realign the incoming pages. However, the exchange has the
exact same attributes as its child, so the incoming layout should match
its attributes perfectly. This change removes the realignment.
Nhat Nguyen пре 7 месеци
родитељ
комит
49254b0bdc

+ 5 - 0
docs/changelog/124610.yaml

@@ -0,0 +1,5 @@
+pr: 124610
+summary: Remove page alignment in exchange sink
+area: ES|QL
+type: enhancement
+issues: []

+ 4 - 9
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java

@@ -23,7 +23,6 @@ import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
 import java.util.Objects;
-import java.util.function.Function;
 import java.util.function.Supplier;
 
 /**
@@ -32,17 +31,14 @@ import java.util.function.Supplier;
 public class ExchangeSinkOperator extends SinkOperator {
 
     private final ExchangeSink sink;
-    private final Function<Page, Page> transformer;
     private int pagesReceived;
     private long rowsReceived;
 
-    public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks, Function<Page, Page> transformer)
-        implements
-            SinkOperatorFactory {
+    public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks) implements SinkOperatorFactory {
 
         @Override
         public SinkOperator get(DriverContext driverContext) {
-            return new ExchangeSinkOperator(exchangeSinks.get(), transformer);
+            return new ExchangeSinkOperator(exchangeSinks.get());
         }
 
         @Override
@@ -51,9 +47,8 @@ public class ExchangeSinkOperator extends SinkOperator {
         }
     }
 
-    public ExchangeSinkOperator(ExchangeSink sink, Function<Page, Page> transformer) {
+    public ExchangeSinkOperator(ExchangeSink sink) {
         this.sink = sink;
-        this.transformer = transformer;
     }
 
     @Override
@@ -84,7 +79,7 @@ public class ExchangeSinkOperator extends SinkOperator {
     protected void doAddInput(Page page) {
         pagesReceived++;
         rowsReceived += page.getPositionCount();
-        sink.addPage(transformer.apply(page));
+        sink.addPage(page);
     }
 
     @Override

+ 2 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

@@ -45,7 +45,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
 import java.util.function.LongSupplier;
 
 import static org.hamcrest.Matchers.either;
@@ -328,7 +327,7 @@ public class DriverTests extends ESTestCase {
             final int maxAllowedRows = between(1, 100);
             final AtomicInteger processedRows = new AtomicInteger(0);
             var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis);
-            var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
+            var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}));
             final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() {
                 @Override
                 public Block eval(Page page) {
@@ -365,7 +364,7 @@ public class DriverTests extends ESTestCase {
             var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"));
             var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
             var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
-            var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
+            var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}));
             Driver driver = TestDriverFactory.create(driverContext, sourceOperator, List.of(), sinkOperator);
             PlainActionFuture<Void> future = new PlainActionFuture<>();
             Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future);

+ 1 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java

@@ -39,7 +39,6 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -242,7 +241,7 @@ public abstract class ForkingOperatorTestCase extends OperatorTestCase {
                         simpleWithMode(AggregatorMode.INTERMEDIATE).get(driver1Context),
                         intermediateOperatorItr.next()
                     ),
-                    new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}), Function.identity())
+                    new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}))
                 )
             );
         }

+ 1 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

@@ -65,7 +65,6 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -305,7 +304,7 @@ public class ExchangeServiceTests extends ESTestCase {
                 "sink-" + i,
                 dc,
                 seqNoGenerator.get(dc),
-                new ExchangeSinkOperator(exchangeSink.get(), Function.identity())
+                new ExchangeSinkOperator(exchangeSink.get())
             );
             drivers.add(d);
         }

+ 1 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExec.java

@@ -27,6 +27,7 @@ public class ExchangeSinkExec extends UnaryExec {
     );
 
     private final List<Attribute> output;
+    // TODO: remove this flag
     private final boolean intermediateAgg;
 
     public ExchangeSinkExec(Source source, List<Attribute> output, boolean intermediateAgg, PhysicalPlan child) {

+ 1 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

@@ -385,14 +385,8 @@ public class LocalExecutionPlanner {
     private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalExecutionPlannerContext context) {
         Objects.requireNonNull(exchangeSinkSupplier, "ExchangeSinkHandler wasn't provided");
         var child = exchangeSink.child();
-
         PhysicalOperation source = plan(child, context);
-
-        Function<Page, Page> transformer = exchangeSink.isIntermediateAgg()
-            ? Function.identity()
-            : alignPageToAttributes(exchangeSink.output(), source.layout);
-
-        return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier, transformer), source.layout);
+        return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier), source.layout);
     }
 
     private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) {