瀏覽代碼

[9.1] Ban Limit + MvExpand before remote Enrich (#135051) (#135310)

* Ban Limit + MvExpand before remote Enrich (#135051)

* Ban Limit + MvExpand before remote Enrich

(cherry picked from commit 7f1d2dc3335f541bb5e70050416cfc585f6e80db)
Stanislav Malyshev 2 周之前
父節點
當前提交
2549a2c8c1

+ 5 - 0
docs/changelog/135051.yaml

@@ -0,0 +1,5 @@
+pr: 135051
+summary: Ban Limit + `MvExpand` before remote Enrich
+area: ES|QL
+type: bug
+issues: []

+ 14 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java

@@ -423,6 +423,20 @@ public class CrossClusterEnrichIT extends AbstractEnrichBasedCrossClusterTestCas
         );
     }
 
+    public void testEnrichAfterMvExpandLimit() {
+        String query = String.format(Locale.ROOT, """
+            FROM *:events,events
+            | SORT timestamp
+            | LIMIT 2
+            | eval ip= TO_STR(host)
+            | MV_EXPAND host
+            | WHERE ip != ""
+            | %s
+            """, enrichHosts(Enrich.Mode.REMOTE));
+        var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
+        assertThat(error.getMessage(), containsString("MV_EXPAND after LIMIT is incompatible with remote ENRICH"));
+    }
+
     private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) {
         assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
         assertTrue(executionInfo.isCrossClusterSearch());

+ 38 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

@@ -18,6 +18,7 @@ import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.transport.RemoteClusterAware;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
+import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
 import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
 import org.elasticsearch.xpack.esql.common.Failures;
 import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
@@ -51,7 +52,13 @@ import static org.elasticsearch.xpack.esql.common.Failure.fail;
 import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes;
 import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
 
-public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAnalysisPlanVerificationAware, TelemetryAware, SortAgnostic {
+public class Enrich extends UnaryPlan
+    implements
+        GeneratingPlan<Enrich>,
+        PostAnalysisPlanVerificationAware,
+        PostAnalysisVerificationAware,
+        TelemetryAware,
+        SortAgnostic {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
         LogicalPlan.class,
         "Enrich",
@@ -326,4 +333,34 @@ public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAna
 
         badCommands.forEach(c -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after " + c)));
     }
+
+    /**
+     * Remote ENRICH (and any remote operation in fact) is not compatible with MV_EXPAND + LIMIT. Consider:
+     * `FROM *:events | SORT @timestamp | LIMIT 2 | MV_EXPAND ip | ENRICH _remote:clientip_policy ON ip`
+     * Semantically, this must take two top events and then expand them. However, this can not be executed remotely,
+     * because this means that we have to take top 2 events on each node, then expand them, then apply Enrich,
+     * then bring them to the coordinator - but then we can not select top 2 of them - because that would be pre-expand!
+     * We do not know which expanded rows are coming from the true top rows and which are coming from "false" top rows
+     * which should have been thrown out. This is only possible to execute if MV_EXPAND executes on the coordinator
+     * - which contradicts remote Enrich.
+     * This could be fixed by the optimizer by moving MV_EXPAND past ENRICH, at least in some cases, but currently we do not do that.
+     */
+    private void checkMvExpandAfterLimit(Failures failures) {
+        this.forEachDown(MvExpand.class, u -> {
+            u.forEachDown(p -> {
+                if (p instanceof Limit || p instanceof TopN) {
+                    failures.add(fail(this, "MV_EXPAND after LIMIT is incompatible with remote ENRICH"));
+                }
+            });
+        });
+
+    }
+
+    @Override
+    public void postAnalysisVerification(Failures failures) {
+        if (this.mode == Mode.REMOTE) {
+            checkMvExpandAfterLimit(failures);
+        }
+
+    }
 }