소스 검색

ESQL: Disallow remote enrich after lu join (#131426)

Fix https://github.com/elastic/elasticsearch/issues/129372

Due to how remote ENRICH is
[planned](https://github.com/elastic/elasticsearch/blob/32e50d0d94e27ee559d24bf9d5463ba6e64d1788/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java#L93),
it interacts in special ways with pipeline breakers, in particular LIMIT
and TopN; when these are encountered upstream from a remote ENRICH,
these nodes are copied and executed a second time after the remote
ENRICH.

We'd like to allow remote ENRICH after LOOKUP JOIN, but that forces the
lookup to be remote as well; this has its own interactions with pipeline
breakers: in particular, LIMITs and TopNs cannot just be duplicated
after LOOKUP JOIN, as LOOKUP JOIN may add new rows.

For now, let's just forbid any usage of remote ENRICH after LOOKUP
JOINs; remote ENRICH is mostly relevant for CCS, and LOOKUP JOIN doesn't
support that in 9.1/8.19, anyway.

There is separate work that enables remote LOOKUP JOINs on remote
clusters and adds the correct validations; we can later build support
for remote ENRICH + LOOKUP JOIN on top of that. (C.f. my comment
[here](https://github.com/elastic/elasticsearch/issues/129372#issuecomment-3083024230)
and my draft https://github.com/elastic/elasticsearch/pull/131286 for
enabling this.)
Alexander Spies 3 달 전
부모
커밋
06e39c0377

+ 6 - 0
docs/changelog/131426.yaml

@@ -0,0 +1,6 @@
+pr: 131426
+summary: Disallow remote enrich after lu join
+area: ES|QL
+type: bug
+issues:
+ - 129372

+ 4 - 1
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

@@ -126,7 +126,10 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
         "NullifiedJoinKeyToPurgeTheJoin",
         "SortBeforeAndAfterJoin",
         "SortEvalBeforeLookup",
-        "SortBeforeAndAfterMultipleJoinAndMvExpand"
+        "SortBeforeAndAfterMultipleJoinAndMvExpand",
+        "LookupJoinAfterTopNAndRemoteEnrich",
+        // Lookup join after LIMIT is not supported in CCS yet
+        "LookupJoinAfterLimitAndRemoteEnrich"
     );
 
     @Override

+ 101 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec

@@ -661,3 +661,104 @@ from *
 author.keyword:keyword|book_no:keyword|scalerank:integer|street:keyword|bytes_in:ul|@timestamp:unsupported|abbrev:keyword|city_location:geo_point|distance:double|description:unsupported|birth_date:date|language_code:integer|intersects:boolean|client_ip:unsupported|event_duration:long|version:version|language_name:keyword
 Fyodor Dostoevsky     |1211           |null             |null          |null       |null                  |null          |null                   |null           |null                   |null           |null                 |null              |null                 |null               |null           |null
 ;
+
+
+statsAfterRemoteEnrich
+required_capability: enrich_load
+
+FROM sample_data
+| KEEP message
+| WHERE message IN ("Connected to 10.1.0.1", "Connected to 10.1.0.2")
+| EVAL language_code = "1"
+| ENRICH _remote:languages_policy ON language_code
+| STATS messages = count_distinct(message) BY language_name
+;
+
+messages:long | language_name:keyword
+2             | English
+;
+
+
+enrichAfterRemoteEnrich
+required_capability: enrich_load
+
+FROM sample_data
+| KEEP message
+| WHERE message IN ("Connected to 10.1.0.1")
+| EVAL language_code = "1"
+| ENRICH _remote:languages_policy ON language_code
+| RENAME language_name AS first_language_name
+| ENRICH languages_policy ON language_code
+;
+
+message:keyword       | language_code:keyword | first_language_name:keyword | language_name:keyword
+Connected to 10.1.0.1 | 1                     | English                     | English
+;
+
+
+coordinatorEnrichAfterRemoteEnrich
+required_capability: enrich_load
+
+FROM sample_data
+| KEEP message
+| WHERE message IN ("Connected to 10.1.0.1")
+| EVAL language_code = "1"
+| ENRICH _remote:languages_policy ON language_code
+| RENAME language_name AS first_language_name
+| ENRICH _coordinator:languages_policy ON language_code
+;
+
+message:keyword       | language_code:keyword | first_language_name:keyword | language_name:keyword
+Connected to 10.1.0.1 | 1                     | English                     | English
+;
+
+
+doubleRemoteEnrich
+required_capability: enrich_load
+
+FROM sample_data
+| KEEP message
+| WHERE message IN ("Connected to 10.1.0.1")
+| EVAL language_code = "1"
+| ENRICH _remote:languages_policy ON language_code
+| RENAME language_name AS first_language_name
+| ENRICH _remote:languages_policy ON language_code
+;
+
+message:keyword       | language_code:keyword | first_language_name:keyword | language_name:keyword
+Connected to 10.1.0.1 | 1                     | English                     | English
+;
+
+
+enrichAfterCoordinatorEnrich
+required_capability: enrich_load
+
+FROM sample_data
+| KEEP message
+| WHERE message IN ("Connected to 10.1.0.1")
+| EVAL language_code = "1"
+| ENRICH _coordinator:languages_policy ON language_code
+| RENAME language_name AS first_language_name
+| ENRICH languages_policy ON language_code
+;
+
+message:keyword       | language_code:keyword | first_language_name:keyword | language_name:keyword
+Connected to 10.1.0.1 | 1                     | English                     | English
+;
+
+
+doubleCoordinatorEnrich
+required_capability: enrich_load
+
+FROM sample_data
+| KEEP message
+| WHERE message IN ("Connected to 10.1.0.1")
+| EVAL language_code = "1"
+| ENRICH _coordinator:languages_policy ON language_code
+| RENAME language_name AS first_language_name
+| ENRICH _coordinator:languages_policy ON language_code
+;
+
+message:keyword       | language_code:keyword | first_language_name:keyword | language_name:keyword
+Connected to 10.1.0.1 | 1                     | English                     | English
+;

+ 98 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

@@ -4773,3 +4773,101 @@ FROM sample_data_ts_nanos
 2023-10-23T12:27:28.948123456Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2
 2023-10-23T12:15:03.360123456Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3
 ;
+
+###############################################
+# LOOKUP JOIN and ENRICH
+###############################################
+
+enrichAfterLookupJoin
+required_capability: join_lookup_v12
+
+FROM sample_data
+| KEEP message
+| WHERE message == "Connected to 10.1.0.1"
+| EVAL language_code = "1"
+| LOOKUP JOIN message_types_lookup ON message
+| ENRICH languages_policy ON language_code
+;
+
+message:keyword       | language_code:keyword | type:keyword | language_name:keyword
+Connected to 10.1.0.1 | 1                     | Success      | English
+;
+
+
+lookupJoinAfterEnrich
+required_capability: join_lookup_v12
+
+FROM sample_data
+| KEEP message
+| WHERE message == "Connected to 10.1.0.1"
+| EVAL language_code = "1"
+| ENRICH languages_policy ON language_code
+| LOOKUP JOIN message_types_lookup ON message
+;
+
+message:keyword       | language_code:keyword | language_name:keyword | type:keyword
+Connected to 10.1.0.1 | 1                     | English               | Success
+;
+
+
+lookupJoinAfterRemoteEnrich
+required_capability: join_lookup_v12
+
+FROM sample_data
+| KEEP message
+| WHERE message == "Connected to 10.1.0.1"
+| EVAL language_code = "1"
+| ENRICH _remote:languages_policy ON language_code
+| LOOKUP JOIN message_types_lookup ON message
+;
+
+message:keyword       | language_code:keyword | language_name:keyword | type:keyword
+Connected to 10.1.0.1 | 1                     | English               | Success
+;
+
+
+lookupJoinAfterLimitAndRemoteEnrich
+required_capability: join_lookup_v12
+
+FROM sample_data
+| KEEP message
+| WHERE message == "Connected to 10.1.0.1"
+| EVAL language_code = "1"
+| LIMIT 1
+| ENRICH _remote:languages_policy ON language_code
+| EVAL enrich_language_name = language_name, language_code = language_code::integer
+| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
+| KEEP message, enrich_language_name, language_name, country.keyword
+| SORT language_name, country.keyword
+;
+
+message:keyword       | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
+Connected to 10.1.0.1 | English                      | English               | Canada
+Connected to 10.1.0.1 | English                      | English               | United States of America
+Connected to 10.1.0.1 | English                      | English               | null
+Connected to 10.1.0.1 | English                      | null                  | United Kingdom
+;
+
+
+lookupJoinAfterTopNAndRemoteEnrich
+required_capability: join_lookup_v12
+
+FROM sample_data
+| KEEP message
+| WHERE message == "Connected to 10.1.0.1"
+| EVAL language_code = "1"
+| SORT message
+| LIMIT 1
+| ENRICH _remote:languages_policy ON language_code
+| EVAL enrich_language_name = language_name, language_code = language_code::integer
+| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
+| KEEP message, enrich_language_name, language_name, country.keyword
+| SORT language_name, country.keyword
+;
+
+message:keyword       | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
+Connected to 10.1.0.1 | English                      | English               | Canada
+Connected to 10.1.0.1 | English                      | English               | United States of America
+Connected to 10.1.0.1 | English                      | English               | null
+Connected to 10.1.0.1 | English                      | null                  | United Kingdom
+;

+ 35 - 14
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

@@ -35,6 +35,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.index.EsIndex;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
+import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -295,23 +296,43 @@ public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAna
      * retaining the originating cluster and restructing pages for routing, which might be complicated.
      */
     private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) {
-        boolean[] agg = { false };
-        boolean[] enrichCoord = { false };
+        // First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be
+        // in separate FORK branches which are valid by themselves.
+        plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures));
+    }
+
+    /**
+     * For a given remote {@link Enrich}, check if there are any forbidden plans upstream.
+     */
+    private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) {
+        if (enrich.mode != Mode.REMOTE) {
+            return;
+        }
+
+        // TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only.
+        // https://github.com/elastic/elasticsearch/issues/131445
+        boolean[] aggregate = { false };
+        boolean[] coordinatorOnlyEnrich = { false };
+        boolean[] lookupJoin = { false };
 
-        plan.forEachUp(UnaryPlan.class, u -> {
+        enrich.forEachUp(LogicalPlan.class, u -> {
             if (u instanceof Aggregate) {
-                agg[0] = true;
-            } else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
-                enrichCoord[0] = true;
-            }
-            if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
-                if (agg[0]) {
-                    failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
-                }
-                if (enrichCoord[0]) {
-                    failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
-                }
+                aggregate[0] = true;
+            } else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
+                coordinatorOnlyEnrich[0] = true;
+            } else if (u instanceof LookupJoin) {
+                lookupJoin[0] = true;
             }
         });
+
+        if (aggregate[0]) {
+            failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
+        }
+        if (coordinatorOnlyEnrich[0]) {
+            failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
+        }
+        if (lookupJoin[0]) {
+            failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN"));
+        }
     }
 }

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

@@ -87,7 +87,7 @@ public class Mapper {
         PhysicalPlan mappedChild = map(unary.child());
 
         //
-        // TODO - this is hard to follow and needs reworking
+        // TODO - this is hard to follow, causes bugs and needs reworking
         // https://github.com/elastic/elasticsearch/issues/115897
         //
         if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {

+ 24 - 25
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

@@ -36,9 +36,9 @@ import java.util.function.Supplier;
 import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
 import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
 import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
-import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution;
 
 public final class AnalyzerTestUtils {
 
@@ -61,27 +61,36 @@ public final class AnalyzerTestUtils {
     }
 
     public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
-        return new Analyzer(
-            new AnalyzerContext(
-                EsqlTestUtils.TEST_CFG,
-                new EsqlFunctionRegistry(),
-                indexResolution,
-                defaultLookupResolution(),
-                defaultEnrichResolution(),
-                emptyInferenceResolution()
-            ),
-            verifier
-        );
+        return analyzer(indexResolution, defaultLookupResolution(), verifier);
     }
 
     public static Analyzer analyzer(IndexResolution indexResolution, Map<String, IndexResolution> lookupResolution, Verifier verifier) {
+        return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier);
+    }
+
+    public static Analyzer analyzer(
+        IndexResolution indexResolution,
+        Map<String, IndexResolution> lookupResolution,
+        EnrichResolution enrichResolution,
+        Verifier verifier
+    ) {
+        return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG);
+    }
+
+    public static Analyzer analyzer(
+        IndexResolution indexResolution,
+        Map<String, IndexResolution> lookupResolution,
+        EnrichResolution enrichResolution,
+        Verifier verifier,
+        Configuration config
+    ) {
         return new Analyzer(
             new AnalyzerContext(
-                EsqlTestUtils.TEST_CFG,
+                config,
                 new EsqlFunctionRegistry(),
                 indexResolution,
                 lookupResolution,
-                defaultEnrichResolution(),
+                enrichResolution,
                 defaultInferenceResolution()
             ),
             verifier
@@ -89,17 +98,7 @@ public final class AnalyzerTestUtils {
     }
 
     public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
-        return new Analyzer(
-            new AnalyzerContext(
-                config,
-                new EsqlFunctionRegistry(),
-                indexResolution,
-                defaultLookupResolution(),
-                defaultEnrichResolution(),
-                defaultInferenceResolution()
-            ),
-            verifier
-        );
+        return analyzer(indexResolution, defaultLookupResolution(), defaultEnrichResolution(), verifier, config);
     }
 
     public static Analyzer analyzer(Verifier verifier) {

+ 138 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.xpack.esql.parser.EsqlParser;
 import org.elasticsearch.xpack.esql.parser.ParsingException;
 import org.elasticsearch.xpack.esql.parser.QueryParam;
 import org.elasticsearch.xpack.esql.parser.QueryParams;
+import org.elasticsearch.xpack.esql.plan.logical.Enrich;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -38,9 +39,13 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
+import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.paramAsConstant;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
+import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
+import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadEnrichPolicyResolution;
 import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
 import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
 import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT;
@@ -2343,7 +2348,140 @@ public class VerifierTests extends ESTestCase {
             () -> query("FROM test,remote:test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code")
         );
         assertThat(e.getMessage(), containsString("remote clusters are not supported with LOOKUP JOIN"));
+    }
 
+    public void testRemoteEnrichAfterLookupJoin() {
+        EnrichResolution enrichResolution = new EnrichResolution();
+        loadEnrichPolicyResolution(
+            enrichResolution,
+            Enrich.Mode.REMOTE,
+            MATCH_TYPE,
+            "languages",
+            "language_code",
+            "languages_idx",
+            "mapping-languages.json"
+        );
+        var analyzer = AnalyzerTestUtils.analyzer(
+            loadMapping("mapping-default.json", "test"),
+            defaultLookupResolution(),
+            enrichResolution,
+            TEST_VERIFIER
+        );
+
+        String lookupCommand = randomBoolean() ? "LOOKUP JOIN test_lookup ON languages" : "LOOKUP JOIN languages_lookup ON language_code";
+
+        query(Strings.format("""
+            FROM test
+            | EVAL language_code = languages
+            | ENRICH _remote:languages ON language_code
+            | %s
+            """, lookupCommand), analyzer);
+
+        String err = error(Strings.format("""
+            FROM test
+            | EVAL language_code = languages
+            | %s
+            | ENRICH _remote:languages ON language_code
+            """, lookupCommand), analyzer);
+        assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));
+
+        err = error(Strings.format("""
+            FROM test
+            | EVAL language_code = languages
+            | %s
+            | ENRICH _remote:languages ON language_code
+            | %s
+            """, lookupCommand, lookupCommand), analyzer);
+        assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));
+
+        err = error(Strings.format("""
+            FROM test
+            | EVAL language_code = languages
+            | %s
+            | EVAL x = 1
+            | MV_EXPAND language_code
+            | ENRICH _remote:languages ON language_code
+            """, lookupCommand), analyzer);
+        assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));
+    }
+
+    public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
+        EnrichResolution enrichResolution = new EnrichResolution();
+        loadEnrichPolicyResolution(
+            enrichResolution,
+            Enrich.Mode.REMOTE,
+            MATCH_TYPE,
+            "languages",
+            "language_code",
+            "languages_idx",
+            "mapping-languages.json"
+        );
+        loadEnrichPolicyResolution(
+            enrichResolution,
+            Enrich.Mode.COORDINATOR,
+            MATCH_TYPE,
+            "languages",
+            "language_code",
+            "languages_idx",
+            "mapping-languages.json"
+        );
+        var analyzer = AnalyzerTestUtils.analyzer(
+            loadMapping("mapping-default.json", "test"),
+            defaultLookupResolution(),
+            enrichResolution,
+            TEST_VERIFIER
+        );
+
+        query("""
+            FROM test
+            | EVAL language_code = languages
+            | ENRICH _remote:languages ON language_code
+            | STATS count(*) BY language_name
+            """, analyzer);
+
+        String err = error("""
+            FROM test
+            | EVAL language_code = languages
+            | STATS count(*) BY language_code
+            | ENRICH _remote:languages ON language_code
+            """, analyzer);
+        assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after STATS"));
+
+        err = error("""
+            FROM test
+            | EVAL language_code = languages
+            | STATS count(*) BY language_code
+            | EVAL x = 1
+            | MV_EXPAND language_code
+            | ENRICH _remote:languages ON language_code
+            """, analyzer);
+        assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after STATS"));
+
+        query("""
+            FROM test
+            | EVAL language_code = languages
+            | ENRICH _remote:languages ON language_code
+            | ENRICH _coordinator:languages ON language_code
+            """, analyzer);
+
+        err = error("""
+            FROM test
+            | EVAL language_code = languages
+            | ENRICH _coordinator:languages ON language_code
+            | ENRICH _remote:languages ON language_code
+            """, analyzer);
+        assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
+
+        err = error("""
+            FROM test
+            | EVAL language_code = languages
+            | ENRICH _coordinator:languages ON language_code
+            | EVAL x = 1
+            | MV_EXPAND language_name
+            | DISSECT language_name "%{foo}"
+            | ENRICH _remote:languages ON language_code
+            """, analyzer);
+        assertThat(err, containsString("7:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
     }
 
     private void checkFullTextFunctionsInStats(String functionInvocation) {