Browse Source

[8.x] Add cluster level reduction (#117731) (#117868)

* Add cluster level reduction (#117731)

This change introduces cluster-level reduction. Unlike data-node-level 
reduction, it does not require pragmas because the network latency and
throughput across clusters differ significantly from those within a
cluster. As a result, the benefits of this reduction should outweigh the
risks.

* compile
Nhat Nguyen 10 months ago
parent
commit
ad3c2b546d

+ 5 - 0
docs/changelog/117731.yaml

@@ -0,0 +1,5 @@
+pr: 117731
+summary: Add cluster level reduction
+area: ES|QL
+type: enhancement
+issues: []

+ 37 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java

@@ -238,4 +238,41 @@ public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase {
             }
         }
     }
+
+    public void testTasks() throws Exception {
+        createRemoteIndex(between(10, 100));
+        EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
+        request.query("FROM *:test | STATS total=sum(const) | LIMIT 1");
+        request.pragmas(randomPragmas());
+        ActionFuture<EsqlQueryResponse> requestFuture = client().execute(EsqlQueryAction.INSTANCE, request);
+        assertTrue(PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));
+        try {
+            assertBusy(() -> {
+                List<TaskInfo> clusterTasks = client(REMOTE_CLUSTER).admin()
+                    .cluster()
+                    .prepareListTasks()
+                    .setActions(ComputeService.CLUSTER_ACTION_NAME)
+                    .get()
+                    .getTasks();
+                assertThat(clusterTasks.size(), equalTo(1));
+                List<TaskInfo> drivers = client(REMOTE_CLUSTER).admin()
+                    .cluster()
+                    .prepareListTasks()
+                    .setTargetParentTaskId(clusterTasks.get(0).taskId())
+                    .setActions(DriverTaskRunner.ACTION_NAME)
+                    .setDetailed(true)
+                    .get()
+                    .getTasks();
+                assertThat(drivers.size(), equalTo(1));
+                TaskInfo driver = drivers.get(0);
+                assertThat(driver.description(), equalTo("""
+                    \\_ExchangeSourceOperator[]
+                    \\_AggregationOperator[mode = INTERMEDIATE, aggs = sum of longs]
+                    \\_ExchangeSinkOperator"""));
+            });
+        } finally {
+            PauseFieldPlugin.allowEmitting.countDown();
+        }
+        requestFuture.actionGet(30, TimeUnit.SECONDS).close();
+    }
 }

+ 17 - 30
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

@@ -29,14 +29,8 @@ import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
 import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer;
 import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
 import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer;
-import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
 import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
 import org.elasticsearch.xpack.esql.plan.logical.Filter;
-import org.elasticsearch.xpack.esql.plan.logical.Limit;
-import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
-import org.elasticsearch.xpack.esql.plan.logical.TopN;
-import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
 import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
 import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
@@ -44,10 +38,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
 import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
-import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
-import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
-import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
 import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
 import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
 import org.elasticsearch.xpack.esql.session.Configuration;
@@ -83,29 +74,25 @@ public class PlannerUtils {
         return new Tuple<>(coordinatorPlan, dataNodePlan.get());
     }
 
-    public static PhysicalPlan dataNodeReductionPlan(LogicalPlan plan, PhysicalPlan unused) {
-        var pipelineBreakers = plan.collectFirstChildren(Mapper::isPipelineBreaker);
+    public static PhysicalPlan reductionPlan(PhysicalPlan plan) {
+        // find the logical fragment
+        var fragments = plan.collectFirstChildren(p -> p instanceof FragmentExec);
+        if (fragments.isEmpty()) {
+            return null;
+        }
+        final FragmentExec fragment = (FragmentExec) fragments.get(0);
 
-        if (pipelineBreakers.isEmpty() == false) {
-            UnaryPlan pipelineBreaker = (UnaryPlan) pipelineBreakers.get(0);
-            if (pipelineBreaker instanceof TopN) {
-                LocalMapper mapper = new LocalMapper();
-                var physicalPlan = EstimatesRowSize.estimateRowSize(0, mapper.map(plan));
-                return physicalPlan.collectFirstChildren(TopNExec.class::isInstance).get(0);
-            } else if (pipelineBreaker instanceof Limit limit) {
-                return new LimitExec(limit.source(), unused, limit.limit());
-            } else if (pipelineBreaker instanceof OrderBy order) {
-                return new OrderExec(order.source(), unused, order.order());
-            } else if (pipelineBreaker instanceof Aggregate) {
-                LocalMapper mapper = new LocalMapper();
-                var physicalPlan = EstimatesRowSize.estimateRowSize(0, mapper.map(plan));
-                var aggregate = (AggregateExec) physicalPlan.collectFirstChildren(AggregateExec.class::isInstance).get(0);
-                return aggregate.withMode(AggregatorMode.INITIAL);
-            } else {
-                throw new EsqlIllegalArgumentException("unsupported unary physical plan node [" + pipelineBreaker.nodeName() + "]");
-            }
+        final var pipelineBreakers = fragment.fragment().collectFirstChildren(Mapper::isPipelineBreaker);
+        if (pipelineBreakers.isEmpty()) {
+            return null;
+        }
+        final var pipelineBreaker = pipelineBreakers.get(0);
+        final LocalMapper mapper = new LocalMapper();
+        PhysicalPlan reducePlan = mapper.map(pipelineBreaker);
+        if (reducePlan instanceof AggregateExec agg) {
+            reducePlan = agg.withMode(AggregatorMode.INITIAL); // force to emit intermediate outputs
         }
-        return null;
+        return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), reducePlan);
     }
 
     /**

+ 19 - 37
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -60,12 +60,10 @@ import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
 import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
-import org.elasticsearch.xpack.esql.core.util.Holder;
 import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
-import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
 import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
@@ -780,35 +778,24 @@ public class ComputeService {
         }
     }
 
+    private static PhysicalPlan reductionPlan(ExchangeSinkExec plan, boolean enable) {
+        PhysicalPlan reducePlan = new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg());
+        if (enable) {
+            PhysicalPlan p = PlannerUtils.reductionPlan(plan);
+            if (p != null) {
+                reducePlan = p.replaceChildren(List.of(reducePlan));
+            }
+        }
+        return new ExchangeSinkExec(plan.source(), plan.output(), plan.isIntermediateAgg(), reducePlan);
+    }
+
     private class DataNodeRequestHandler implements TransportRequestHandler<DataNodeRequest> {
         @Override
         public void messageReceived(DataNodeRequest request, TransportChannel channel, Task task) {
             final ActionListener<ComputeResponse> listener = new ChannelActionListener<>(channel);
-            final ExchangeSinkExec reducePlan;
+            final PhysicalPlan reductionPlan;
             if (request.plan() instanceof ExchangeSinkExec plan) {
-                var fragments = plan.collectFirstChildren(FragmentExec.class::isInstance);
-                if (fragments.isEmpty()) {
-                    listener.onFailure(new IllegalStateException("expected a fragment plan for a remote compute; got " + request.plan()));
-                    return;
-                }
-                var localExchangeSource = new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg());
-                Holder<PhysicalPlan> reducePlanHolder = new Holder<>();
-                if (request.pragmas().nodeLevelReduction()) {
-                    PhysicalPlan dataNodePlan = request.plan();
-                    request.plan()
-                        .forEachUp(
-                            FragmentExec.class,
-                            f -> { reducePlanHolder.set(PlannerUtils.dataNodeReductionPlan(f.fragment(), dataNodePlan)); }
-                        );
-                }
-                reducePlan = new ExchangeSinkExec(
-                    plan.source(),
-                    plan.output(),
-                    plan.isIntermediateAgg(),
-                    reducePlanHolder.get() != null
-                        ? reducePlanHolder.get().replaceChildren(List.of(localExchangeSource))
-                        : localExchangeSource
-                );
+                reductionPlan = reductionPlan(plan, request.pragmas().nodeLevelReduction());
             } else {
                 listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + request.plan()));
                 return;
@@ -825,7 +812,7 @@ public class ComputeService {
                 request.indicesOptions()
             );
             try (var computeListener = ComputeListener.create(transportService, (CancellableTask) task, listener)) {
-                runComputeOnDataNode((CancellableTask) task, sessionId, reducePlan, request, computeListener);
+                runComputeOnDataNode((CancellableTask) task, sessionId, reductionPlan, request, computeListener);
             }
         }
     }
@@ -871,10 +858,10 @@ public class ComputeService {
      * Performs a compute on a remote cluster. The output pages are placed in an exchange sink specified by
      * {@code globalSessionId}. The coordinator on the main cluster will poll pages from there.
      * <p>
-     * Currently, the coordinator on the remote cluster simply collects pages from data nodes in the remote cluster
-     * and places them in the exchange sink. We can achieve this by using a single exchange buffer to minimize overhead.
-     * However, here we use two exchange buffers so that we can run an actual plan on this coordinator to perform partial
-     * reduce operations, such as limit, topN, and partial-to-partial aggregation in the future.
+     * Currently, the coordinator on the remote cluster polls pages from data nodes within the remote cluster
+     * and performs cluster-level reduction before sending pages to the querying cluster. This reduction aims
+     * to minimize data transfers across clusters but may require additional CPU resources for operations like
+     * aggregations.
      */
     void runComputeOnRemoteCluster(
         String clusterAlias,
@@ -892,6 +879,7 @@ public class ComputeService {
             () -> exchangeService.finishSinkHandler(globalSessionId, new TaskCancelledException(parentTask.getReasonCancelled()))
         );
         final String localSessionId = clusterAlias + ":" + globalSessionId;
+        final PhysicalPlan coordinatorPlan = reductionPlan(plan, true);
         var exchangeSource = new ExchangeSourceHandler(
             configuration.pragmas().exchangeBufferSize(),
             transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
@@ -899,12 +887,6 @@ public class ComputeService {
         );
         try (Releasable ignored = exchangeSource.addEmptySink()) {
             exchangeSink.addCompletionListener(computeListener.acquireAvoid());
-            PhysicalPlan coordinatorPlan = new ExchangeSinkExec(
-                plan.source(),
-                plan.output(),
-                plan.isIntermediateAgg(),
-                new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg())
-            );
             runCompute(
                 parentTask,
                 new ComputeContext(localSessionId, clusterAlias, List.of(), configuration, exchangeSource, exchangeSink),