Ver Fonte

Move node-level reduction plan to data node (#117422) (#117534)

This change moves the logic for extracting the node-level plan to the 
data node instead of the coordinator. There are several benefits to
doing this on the data node instead:

1. Minimize serialization, especially inter-cluster communications.

2. Resolve the row size estimation issue when generating this plan on 
data nodes. This will be addressed in a follow-up.

3. Allow each cluster to decide whether to run node-level reduction 
based on its own topology.
Nhat Nguyen há 10 meses atrás
pai
commit
35d28739f0

+ 1 - 1
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -206,7 +206,7 @@ public class TransportVersions {
     public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0);
     public static final TransportVersion INDEXING_PRESSURE_THROTTLING_STATS = def(8_798_00_0);
     public static final TransportVersion REINDEX_DATA_STREAMS = def(8_799_00_0);
-
+    public static final TransportVersion ESQL_REMOVE_NODE_LEVEL_PLAN = def(8_800_00_0);
     /*
      * STOP! READ THIS FIRST! No, really,
      *        ____ _____ ___  ____  _        ____  _____    _    ____    _____ _   _ ___ ____    _____ ___ ____  ____ _____ _

+ 1 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java

@@ -73,8 +73,7 @@ public class ProjectAwayColumns extends Rule<PhysicalPlan, PhysicalPlan> {
                             Source.EMPTY,
                             new Project(logicalFragment.source(), logicalFragment, output),
                             fragmentExec.esFilter(),
-                            fragmentExec.estimatedRowSize(),
-                            fragmentExec.reducer()
+                            fragmentExec.estimatedRowSize()
                         );
                         return new ExchangeExec(exec.source(), output, exec.inBetweenAggs(), newChild);
                     }

+ 26 - 30
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExec.java

@@ -31,7 +31,6 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
 
     private final LogicalPlan fragment;
     private final QueryBuilder esFilter;
-    private final PhysicalPlan reducer; // datanode-level physical plan node that performs an intermediate (not partial) reduce
 
     /**
      * Estimate of the number of bytes that'll be loaded per position before
@@ -40,25 +39,28 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
     private final int estimatedRowSize;
 
     public FragmentExec(LogicalPlan fragment) {
-        this(fragment.source(), fragment, null, 0, null);
+        this(fragment.source(), fragment, null, 0);
     }
 
-    public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize, PhysicalPlan reducer) {
+    public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize) {
         super(source);
         this.fragment = fragment;
         this.esFilter = esFilter;
         this.estimatedRowSize = estimatedRowSize;
-        this.reducer = reducer;
     }
 
     private FragmentExec(StreamInput in) throws IOException {
-        this(
-            Source.readFrom((PlanStreamInput) in),
-            in.readNamedWriteable(LogicalPlan.class),
-            in.readOptionalNamedWriteable(QueryBuilder.class),
-            in.readOptionalVInt(),
-            in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readOptionalNamedWriteable(PhysicalPlan.class) : null
-        );
+        super(Source.readFrom((PlanStreamInput) in));
+        this.fragment = in.readNamedWriteable(LogicalPlan.class);
+        this.esFilter = in.readOptionalNamedWriteable(QueryBuilder.class);
+        if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_REMOVE_NODE_LEVEL_PLAN)) {
+            this.estimatedRowSize = in.readVInt();
+        } else {
+            this.estimatedRowSize = Objects.requireNonNull(in.readOptionalVInt());
+            if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
+                in.readOptionalNamedWriteable(PhysicalPlan.class); // for old reducer
+            }
+        }
     }
 
     @Override
@@ -66,9 +68,13 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
         Source.EMPTY.writeTo(out);
         out.writeNamedWriteable(fragment());
         out.writeOptionalNamedWriteable(esFilter());
-        out.writeOptionalVInt(estimatedRowSize());
-        if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
-            out.writeOptionalNamedWriteable(reducer);
+        if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_REMOVE_NODE_LEVEL_PLAN)) {
+            out.writeVInt(estimatedRowSize);
+        } else {
+            out.writeOptionalVInt(estimatedRowSize());
+            if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
+                out.writeOptionalNamedWriteable(null);// for old reducer
+            }
         }
     }
 
@@ -89,13 +95,9 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
         return estimatedRowSize;
     }
 
-    public PhysicalPlan reducer() {
-        return reducer;
-    }
-
     @Override
     protected NodeInfo<FragmentExec> info() {
-        return NodeInfo.create(this, FragmentExec::new, fragment, esFilter, estimatedRowSize, reducer);
+        return NodeInfo.create(this, FragmentExec::new, fragment, esFilter, estimatedRowSize);
     }
 
     @Override
@@ -108,24 +110,20 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
         int estimatedRowSize = state.consumeAllFields(false);
         return Objects.equals(estimatedRowSize, this.estimatedRowSize)
             ? this
-            : new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
+            : new FragmentExec(source(), fragment, esFilter, estimatedRowSize);
     }
 
     public FragmentExec withFragment(LogicalPlan fragment) {
-        return Objects.equals(fragment, this.fragment) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
+        return Objects.equals(fragment, this.fragment) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize);
     }
 
     public FragmentExec withFilter(QueryBuilder filter) {
-        return Objects.equals(filter, this.esFilter) ? this : new FragmentExec(source(), fragment, filter, estimatedRowSize, reducer);
-    }
-
-    public FragmentExec withReducer(PhysicalPlan reducer) {
-        return Objects.equals(reducer, this.reducer) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
+        return Objects.equals(filter, this.esFilter) ? this : new FragmentExec(source(), fragment, filter, estimatedRowSize);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(fragment, esFilter, estimatedRowSize, reducer);
+        return Objects.hash(fragment, esFilter, estimatedRowSize);
     }
 
     @Override
@@ -141,8 +139,7 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
         FragmentExec other = (FragmentExec) obj;
         return Objects.equals(fragment, other.fragment)
             && Objects.equals(esFilter, other.esFilter)
-            && Objects.equals(estimatedRowSize, other.estimatedRowSize)
-            && Objects.equals(reducer, other.reducer);
+            && Objects.equals(estimatedRowSize, other.estimatedRowSize);
     }
 
     @Override
@@ -154,7 +151,6 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
         sb.append(", estimatedRowSize=");
         sb.append(estimatedRowSize);
         sb.append(", reducer=[");
-        sb.append(reducer == null ? "" : reducer.toString());
         sb.append("], fragment=[<>\n");
         sb.append(fragment.toString());
         sb.append("<>]]");

+ 16 - 13
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -60,6 +60,7 @@ import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
 import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.util.Holder;
 import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
@@ -314,14 +315,7 @@ public class ComputeService {
         EsqlExecutionInfo executionInfo,
         ComputeListener computeListener
     ) {
-        var planWithReducer = configuration.pragmas().nodeLevelReduction() == false
-            ? dataNodePlan
-            : dataNodePlan.transformUp(FragmentExec.class, f -> {
-                PhysicalPlan reductionNode = PlannerUtils.dataNodeReductionPlan(f.fragment(), dataNodePlan);
-                return reductionNode == null ? f : f.withReducer(reductionNode);
-            });
-
-        QueryBuilder requestFilter = PlannerUtils.requestTimestampFilter(planWithReducer);
+        QueryBuilder requestFilter = PlannerUtils.requestTimestampFilter(dataNodePlan);
         var lookupListener = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
         // SearchShards API can_match is done in lookupDataNodes
         lookupDataNodes(parentTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(dataNodeResult -> {
@@ -361,7 +355,7 @@ public class ComputeService {
                                     clusterAlias,
                                     node.shardIds,
                                     node.aliasFilters,
-                                    planWithReducer,
+                                    dataNodePlan,
                                     originalIndices.indices(),
                                     originalIndices.indicesOptions()
                                 ),
@@ -450,12 +444,12 @@ public class ComputeService {
             );
 
             LOGGER.debug("Received physical plan:\n{}", plan);
+
             plan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration, plan);
             // the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
             // it's doing this in the planning of EsQueryExec (the source of the data)
             // see also EsPhysicalOperationProviders.sourcePhysicalOperation
             LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(plan);
-
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe());
             }
@@ -785,14 +779,23 @@ public class ComputeService {
                     listener.onFailure(new IllegalStateException("expected a fragment plan for a remote compute; got " + request.plan()));
                     return;
                 }
-
                 var localExchangeSource = new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg());
-                FragmentExec fragment = (FragmentExec) fragments.get(0);
+                Holder<PhysicalPlan> reducePlanHolder = new Holder<>();
+                if (request.pragmas().nodeLevelReduction()) {
+                    PhysicalPlan dataNodePlan = request.plan();
+                    request.plan()
+                        .forEachUp(
+                            FragmentExec.class,
+                            f -> { reducePlanHolder.set(PlannerUtils.dataNodeReductionPlan(f.fragment(), dataNodePlan)); }
+                        );
+                }
                 reducePlan = new ExchangeSinkExec(
                     plan.source(),
                     plan.output(),
                     plan.isIntermediateAgg(),
-                    fragment.reducer() != null ? fragment.reducer().replaceChildren(List.of(localExchangeSource)) : localExchangeSource
+                    reducePlanHolder.get() != null
+                        ? reducePlanHolder.get().replaceChildren(List.of(localExchangeSource))
+                        : localExchangeSource
                 );
             } else {
                 listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + request.plan()));

+ 8 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExecSerializationTests.java

@@ -66,12 +66,13 @@ public class ExchangeSinkExecSerializationTests extends AbstractPhysicalPlanSeri
      * See {@link #testManyTypeConflicts(boolean, ByteSizeValue)} for more.
      */
     public void testManyTypeConflicts() throws IOException {
-        testManyTypeConflicts(false, ByteSizeValue.ofBytes(1424048));
+        testManyTypeConflicts(false, ByteSizeValue.ofBytes(1424046L));
         /*
          * History:
          *  2.3mb - shorten error messages for UnsupportedAttributes #111973
          *  1.8mb - cache EsFields #112008
          *  1.4mb - string serialization #112929
+         *  1424046b - remove node-level plan #117422
          */
     }
 
@@ -80,7 +81,7 @@ public class ExchangeSinkExecSerializationTests extends AbstractPhysicalPlanSeri
      * See {@link #testManyTypeConflicts(boolean, ByteSizeValue)} for more.
      */
     public void testManyTypeConflictsWithParent() throws IOException {
-        testManyTypeConflicts(true, ByteSizeValue.ofBytes(2774192));
+        testManyTypeConflicts(true, ByteSizeValue.ofBytes(2774190));
         /*
          * History:
          *  2 gb+ - start
@@ -89,6 +90,7 @@ public class ExchangeSinkExecSerializationTests extends AbstractPhysicalPlanSeri
          *  3.1mb - cache EsFields #112008
          *  2774214b - string serialization #112929
          *  2774192b - remove field attribute #112881
+         *  2774190b - remove node-level plan #117422
          */
     }
 
@@ -103,11 +105,12 @@ public class ExchangeSinkExecSerializationTests extends AbstractPhysicalPlanSeri
      * with a single root field that has many children, grandchildren etc.
      */
     public void testDeeplyNestedFields() throws IOException {
-        ByteSizeValue expected = ByteSizeValue.ofBytes(47252411);
+        ByteSizeValue expected = ByteSizeValue.ofBytes(47252409);
         /*
          * History:
          *  48223371b - string serialization #112929
          *  47252411b - remove field attribute #112881
+         *  47252409b - remove node-level plan
          */
 
         int depth = 6;
@@ -123,11 +126,12 @@ public class ExchangeSinkExecSerializationTests extends AbstractPhysicalPlanSeri
      * with a single root field that has many children, grandchildren etc.
      */
     public void testDeeplyNestedFieldsKeepOnlyOne() throws IOException {
-        ByteSizeValue expected = ByteSizeValue.ofBytes(9425806);
+        ByteSizeValue expected = ByteSizeValue.ofBytes(9425804);
         /*
          * History:
          *  9426058b - string serialization #112929
          *  9425806b - remove field attribute #112881
+         *  9425804b - remove node-level plan #117422
          */
 
         int depth = 6;

+ 3 - 6
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExecSerializationTests.java

@@ -22,8 +22,7 @@ public class FragmentExecSerializationTests extends AbstractPhysicalPlanSerializ
         LogicalPlan fragment = AbstractLogicalPlanSerializationTests.randomChild(depth);
         QueryBuilder esFilter = EsqlQueryRequestTests.randomQueryBuilder();
         int estimatedRowSize = between(0, Integer.MAX_VALUE);
-        PhysicalPlan reducer = randomChild(depth);
-        return new FragmentExec(source, fragment, esFilter, estimatedRowSize, reducer);
+        return new FragmentExec(source, fragment, esFilter, estimatedRowSize);
     }
 
     @Override
@@ -36,15 +35,13 @@ public class FragmentExecSerializationTests extends AbstractPhysicalPlanSerializ
         LogicalPlan fragment = instance.fragment();
         QueryBuilder esFilter = instance.esFilter();
         int estimatedRowSize = instance.estimatedRowSize();
-        PhysicalPlan reducer = instance.reducer();
-        switch (between(0, 3)) {
+        switch (between(0, 2)) {
             case 0 -> fragment = randomValueOtherThan(fragment, () -> AbstractLogicalPlanSerializationTests.randomChild(0));
             case 1 -> esFilter = randomValueOtherThan(esFilter, EsqlQueryRequestTests::randomQueryBuilder);
             case 2 -> estimatedRowSize = randomValueOtherThan(estimatedRowSize, () -> between(0, Integer.MAX_VALUE));
-            case 3 -> reducer = randomValueOtherThan(reducer, () -> randomChild(0));
             default -> throw new UnsupportedEncodingException();
         }
-        return new FragmentExec(instance.source(), fragment, esFilter, estimatedRowSize, reducer);
+        return new FragmentExec(instance.source(), fragment, esFilter, estimatedRowSize);
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java

@@ -305,7 +305,7 @@ public class FilterTests extends ESTestCase {
         // System.out.println("physical\n" + physical);
         physical = physical.transformUp(
             FragmentExec.class,
-            f -> new FragmentExec(f.source(), f.fragment(), restFilter, f.estimatedRowSize(), f.reducer())
+            f -> new FragmentExec(f.source(), f.fragment(), restFilter, f.estimatedRowSize())
         );
         physical = physicalPlanOptimizer.optimize(physical);
         // System.out.println("optimized\n" + physical);