소스 검색

ES|QL: Set FUSE to execute on coordinator (#135515)

Ioana Tagirta 2 주 전
부모
커밋
e95971d802

+ 2 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/fuse.csv-spec

@@ -762,6 +762,7 @@ required_capability: fuse_v6
 
 ROW _id = ["a", "b", "c"], _score = 0.0, _index = "my_index", my_fork = "foo"
 | MV_EXPAND _id
+| LIMIT 10
 | FUSE LINEAR GROUP BY my_fork WITH { "normalizer": "l2_norm" }
 | SORT _id
 | KEEP _id, _score
@@ -779,6 +780,7 @@ required_capability: fuse_v6
 
 ROW _id = ["a", "b", "c"], _score = 0.0, _index = "my_index", my_fork = "foo"
 | MV_EXPAND _id
+| LIMIT 10
 | FUSE LINEAR GROUP BY my_fork WITH { "normalizer": "minmax" }
 | SORT _id
 | KEEP _id, _score

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/Fuse.java

@@ -18,6 +18,7 @@ import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
 
@@ -26,7 +27,7 @@ import java.util.List;
 
 import static org.elasticsearch.xpack.esql.common.Failure.fail;
 
-public class Fuse extends UnaryPlan implements TelemetryAware, PostAnalysisVerificationAware {
+public class Fuse extends UnaryPlan implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator {
     private final Attribute score;
     private final Attribute discriminator;
     private final List<NamedExpression> keys;

+ 28 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java

@@ -26,7 +26,10 @@ import org.elasticsearch.xpack.esql.core.expression.MapExpression;
 import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.core.util.Holder;
+import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
 import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
 
 import java.io.IOException;
@@ -36,7 +39,12 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 
-public class FuseScoreEval extends UnaryPlan implements LicenseAware, PostAnalysisVerificationAware {
+public class FuseScoreEval extends UnaryPlan
+    implements
+        LicenseAware,
+        PostAnalysisVerificationAware,
+        ExecutesOn.Coordinator,
+        PipelineBreaker {
     private final Attribute discriminatorAttr;
     private final Attribute scoreAttr;
     private final Fuse.FuseType fuseType;
@@ -122,6 +130,7 @@ public class FuseScoreEval extends UnaryPlan implements LicenseAware, PostAnalys
 
     @Override
     public void postAnalysisVerification(Failures failures) {
+        validateLimitedInput(failures);
         if (options == null) {
             return;
         }
@@ -132,6 +141,24 @@ public class FuseScoreEval extends UnaryPlan implements LicenseAware, PostAnalys
         }
     }
 
+    private void validateLimitedInput(Failures failures) {
+        var myself = this;
+        Holder<Boolean> hasLimitedInput = new Holder<>(false);
+        this.forEachUp(LogicalPlan.class, plan -> {
+            if (plan == myself) {
+                return;
+            }
+
+            if (plan instanceof PipelineBreaker) {
+                hasLimitedInput.set(true);
+            }
+        });
+
+        if (hasLimitedInput.get() == false) {
+            failures.add(new Failure(this, "FUSE can only be used on a limited number of rows. Consider adding a LIMIT before FUSE."));
+        }
+    }
+
     private void validateRrfOptions(Failures failures) {
         options.keyFoldedMap().forEach((key, value) -> {
             if (key.equals(RrfConfig.RANK_CONSTANT)) {

+ 5 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

@@ -2681,6 +2681,11 @@ public class VerifierTests extends ESTestCase {
             error(queryPrefix + " | fuse linear KEY BY _score"),
             containsString("expected KEY BY field [_score] to be KEYWORD or TEXT, not DOUBLE")
         );
+
+        assertThat(
+            error("FROM test METADATA _index, _score, _id | EVAL _fork = \"fork1\" | FUSE"),
+            containsString("FUSE can only be used on a limited number of rows. Consider adding a LIMIT before FUSE.")
+        );
     }
 
     public void testNoMetricInStatsByClause() {