ソースを参照

ESQL: Allow remote enrich after LOOKUP JOIN (#131940)

* Allow remote enrich after LOOKUP JOIN
Stanislav Malyshev 1 ヶ月 前
コミット
5a4c3abb0e

+ 5 - 0
docs/changelog/131940.yaml

@@ -0,0 +1,5 @@
+pr: 131940
+summary: Allow remote enrich after LOOKUP JOIN
+area: ES|QL
+type: enhancement
+issues: []

+ 137 - 1
x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

@@ -1875,7 +1875,6 @@ type:keyword | language_code:integer | language_name:keyword
 Production   | 3                     | Spanish
 ;
 
-
 ###############################################
 # LOOKUP JOIN on mixed numerical fields
 ###############################################
@@ -4872,6 +4871,143 @@ Connected to 10.1.0.1 | English                      | English               | n
 Connected to 10.1.0.1 | English                      | null                  | United Kingdom
 ;
 
+enrichAfterLookupJoin
+required_capability: join_lookup_v12
+
+FROM sample_data
+| KEEP message
+| WHERE message == "Connected to 10.1.0.1"
+| EVAL language_code = "1"
+| LOOKUP JOIN message_types_lookup ON message
+| ENRICH languages_policy ON language_code
+;
+
+message:keyword       | language_code:keyword | type:keyword | language_name:keyword
+Connected to 10.1.0.1 | 1                     | Success      | English
+;
+
+###############################################
+# LOOKUP JOIN and remote ENRICH
+###############################################
+
+remoteEnrichAfterLookupJoin
+required_capability: join_lookup_v12
+required_capability: remote_enrich_after_lookup_join
+
+FROM sample_data
+| KEEP message
+| WHERE message == "Connected to 10.1.0.1"
+| EVAL language_code = "1"
+| LOOKUP JOIN message_types_lookup ON message
+| ENRICH _remote:languages_policy ON language_code
+;
+
+message:keyword       | language_code:keyword | type:keyword | language_name:keyword
+Connected to 10.1.0.1 | 1                     | Success      | English
+;
+
+remoteEnrichSortAfterLookupJoin
+required_capability: join_lookup_v12
+required_capability: remote_enrich_after_lookup_join
+
+FROM sample_data
+| KEEP message
+| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
+| EVAL language_code = "1"
+| LOOKUP JOIN message_types_lookup ON message
+| ENRICH _remote:languages_policy ON language_code
+| SORT message ASC
+;
+
+message:keyword       | language_code:keyword | type:keyword | language_name:keyword
+Connected to 10.1.0.1 | 1                     | Success      | English
+Connected to 10.1.0.2 | 1                     | Success      | English
+;
+
+sortRemoteEnrichAfterLookupJoin
+required_capability: join_lookup_v12
+required_capability: remote_enrich_after_lookup_join
+
+FROM sample_data
+| KEEP message
+| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
+| EVAL language_code = "1"
+| LOOKUP JOIN message_types_lookup ON message
+| SORT message ASC
+| ENRICH _remote:languages_policy ON language_code
+| LIMIT 2
+;
+
+message:keyword       | language_code:keyword | type:keyword | language_name:keyword
+Connected to 10.1.0.1 | 1                     | Success      | English
+Connected to 10.1.0.2 | 1                     | Success      | English
+;
+
+remoteEnrichSortAfterLookupJoinWithLimit
+required_capability: join_lookup_v12
+required_capability: remote_enrich_after_lookup_join
+
+FROM sample_data
+| KEEP message
+| WHERE message == "Connection error"
+| EVAL language_code = "1"
+| LOOKUP JOIN message_types_lookup ON message
+| LIMIT 2
+| ENRICH _remote:languages_policy ON language_code
+| SORT message ASC
+;
+
+message:keyword  | language_code:keyword | type:keyword | language_name:keyword
+Connection error | 1                     | Error        | English
+Connection error | 1                     | Error        | English
+;
+
+remoteEnrichBetweenLookupJoins
+required_capability: join_lookup_v12
+required_capability: remote_enrich_after_lookup_join
+
+FROM sample_data
+| KEEP message, client_ip
+| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
+| EVAL language_code = "1", client_ip=to_string(client_ip)
+| LOOKUP JOIN message_types_lookup ON message
+| ENRICH _remote:languages_policy ON language_code
+| LOOKUP JOIN clientips_lookup ON client_ip
+| DROP language_code
+| SORT message ASC
+;
+
+message:keyword       | client_ip:keyword | type:keyword | language_name:keyword | env:keyword
+Connected to 10.1.0.1 | 172.21.3.15       | Success      | English               | Production
+Connected to 10.1.0.2 | 172.21.2.113      | Success      | English               | QA
+;
+
+remoteEnrichesAndLookupJoins
+required_capability: join_lookup_v12
+required_capability: remote_enrich_after_lookup_join
+
+FROM sample_data
+| EVAL language_code = "1", client_ip=to_string(client_ip)
+| ENRICH _remote:languages_policy ON language_code
+| LOOKUP JOIN clientips_lookup ON client_ip
+| EVAL env1 = env
+| ENRICH _remote:clientip_policy ON client_ip
+| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
+| LOOKUP JOIN message_types_lookup ON message
+| KEEP message, client_ip, env, env1, type, language_name
+| SORT message ASC
+| LIMIT 10
+;
+
+message:keyword       | client_ip:keyword | env:keyword | env1: keyword | type:keyword | language_name:keyword
+Connected to 10.1.0.1 | 172.21.3.15       | Production  | Production    |  Success     | English
+Connected to 10.1.0.2 | 172.21.2.113      | QA          | QA            |  Success     | English
+;
+
+###############################################
+# Multi-field LOOKUP JOIN
+###############################################
+
 lookupJoinOnTwoFields
 required_capability: join_lookup_v12
 required_capability: lookup_join_on_multiple_fields

+ 1 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java

@@ -117,7 +117,7 @@ public class CrossClusterAsyncEnrichStopIT extends AbstractEnrichBasedCrossClust
         SimplePauseFieldPlugin.allowEmitting.countDown();
 
         try (EsqlQueryResponse resp = stopAction.actionGet(30, TimeUnit.SECONDS)) {
-            // Compare this to CrossClustersEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
+            // Compare this to CrossClusterEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
             // because we stopped it before processing the data
             assertThat(
                 getValuesList(resp),

+ 1 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java

@@ -30,7 +30,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
 /**
- * This IT test is the dual of CrossClustersEnrichIT, which tests "happy path"
+ * This IT test is the dual of CrossClusterEnrichIT, which tests "happy path"
  * and this one tests unavailable cluster scenarios using (most of) the same tests.
  */
 public class CrossClusterEnrichUnavailableClustersIT extends AbstractEnrichBasedCrossClusterTestCase {

+ 6 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -1277,6 +1277,12 @@ public class EsqlCapabilities {
          */
         ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()),
 
+        /**
+         * Fix the planning of {@code | ENRICH _remote:policy} when there's a preceding {@code | LOOKUP JOIN},
+         * see <a href="https://github.com/elastic/elasticsearch/issues/129372">java.lang.ClassCastException when combining LOOKUP JOIN and remote ENRICH</a>
+         */
+        REMOTE_ENRICH_AFTER_LOOKUP_JOIN,
+
         /**
          * MATCH PHRASE function
          */

+ 8 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

@@ -465,7 +465,7 @@ public class LogicalPlanBuilder extends ExpressionBuilder {
 
     @Override
     public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
-        return p -> {
+        return child -> {
             var source = source(ctx);
             Tuple<Mode, String> tuple = parsePolicyName(ctx.policyName);
             Mode mode = tuple.v1();
@@ -484,9 +484,15 @@ public class LogicalPlanBuilder extends ExpressionBuilder {
             }
 
             List<NamedExpression> keepClauses = visitList(this, ctx.enrichWithClause(), NamedExpression.class);
+
+            // If this is a remote-only ENRICH, any upstream LOOKUP JOINs need to be treated as remote-only, too.
+            if (mode == Mode.REMOTE) {
+                child = child.transformDown(LookupJoin.class, lj -> new LookupJoin(lj.source(), lj.left(), lj.right(), lj.config(), true));
+            }
+
             return new Enrich(
                 source,
-                p,
+                child,
                 mode,
                 Literal.keyword(source(ctx.policyName), policyNameString),
                 matchField,

+ 1 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java

@@ -23,7 +23,6 @@ import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
-import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 
 import java.io.IOException;
@@ -34,7 +33,7 @@ import static org.elasticsearch.xpack.esql.common.Failure.fail;
 import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT;
 import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
 
-public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator {
+public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware {
 
     public static final String DEFAULT_OUTPUT_FIELD_NAME = "completion";
 

+ 3 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java

@@ -13,6 +13,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
+import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic;
 import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
@@ -24,7 +25,8 @@ import java.util.Objects;
 public abstract class InferencePlan<PlanType extends InferencePlan<PlanType>> extends UnaryPlan
     implements
         SortAgnostic,
-        GeneratingPlan<InferencePlan<PlanType>> {
+        GeneratingPlan<InferencePlan<PlanType>>,
+        ExecutesOn.Coordinator {
 
     public static final String INFERENCE_ID_OPTION_NAME = "inference_id";
     public static final List<String> VALID_INFERENCE_OPTION_NAMES = List.of(INFERENCE_ID_OPTION_NAME);

+ 1 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java

@@ -26,7 +26,6 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 import org.elasticsearch.xpack.esql.plan.logical.Eval;
-import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
 
@@ -38,7 +37,7 @@ import java.util.function.Predicate;
 import static org.elasticsearch.xpack.esql.common.Failure.fail;
 import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
 
-public class Rerank extends InferencePlan<Rerank> implements PostAnalysisVerificationAware, TelemetryAware, ExecutesOn.Coordinator {
+public class Rerank extends InferencePlan<Rerank> implements PostAnalysisVerificationAware, TelemetryAware {
 
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Rerank", Rerank::new);
     public static final String DEFAULT_INFERENCE_ID = ".rerank-v1-elasticsearch";

+ 6 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

@@ -100,6 +100,8 @@ public class Mapper {
             // 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
             Holder<Boolean> hasFragment = new Holder<>(false);
 
+            // Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice;
+            // include the plan up until this ENRICH in the fragment.
             var childTransformed = mappedChild.transformUp(f -> {
                 // Once we reached FragmentExec, we stuff our Enrich under it
                 if (f instanceof FragmentExec) {
@@ -118,7 +120,10 @@ public class Mapper {
                         return unaryExec.child();
                     }
                 }
-                // Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
+                // Here we have the following possibilities:
+                // 1. LeafExec - should resolve to FragmentExec or we can ignore it
+                // 2. Join - must be remote, and thus will go inside FragmentExec
+                // 3. Fork/MergeExec - not currently allowed with remote enrich
                 return f;
             });
 

+ 70 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerVerificationTests.java

@@ -358,4 +358,74 @@ public class OptimizerVerificationTests extends AbstractLogicalPlanOptimizerTest
         // Since FORK, RERANK, COMPLETION and CHANGE_POINT are not supported on remote indices, we can't check them here against the remote
         // LOOKUP JOIN
     }
+
+    public void testRemoteEnrichAfterLookupJoinWithPipelineBreaker() {
+        EnrichResolution enrichResolution = new EnrichResolution();
+        loadEnrichPolicyResolution(
+            enrichResolution,
+            Enrich.Mode.REMOTE,
+            MATCH_TYPE,
+            "languages",
+            "language_code",
+            "languages_idx",
+            "mapping-languages.json"
+        );
+        loadEnrichPolicyResolution(
+            enrichResolution,
+            Enrich.Mode.COORDINATOR,
+            MATCH_TYPE,
+            "languages_coord",
+            "language_code",
+            "languages_idx",
+            "mapping-languages.json"
+        );
+        var analyzer = AnalyzerTestUtils.analyzer(
+            loadMapping("mapping-default.json", "test"),
+            defaultLookupResolution(),
+            enrichResolution,
+            TEST_VERIFIER
+        );
+
+        String err = error("""
+            FROM test
+            | STATS c = COUNT(*) by languages
+            | EVAL language_code = languages
+            | LOOKUP JOIN languages_lookup ON language_code
+            | ENRICH _remote:languages ON language_code
+            """, analyzer);
+        assertThat(
+            err,
+            containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [STATS c = COUNT(*) by languages]@2:3")
+        );
+
+        err = error("""
+            FROM test
+            | SORT emp_no
+            | EVAL language_code = languages
+            | LOOKUP JOIN languages_lookup ON language_code
+            | ENRICH _remote:languages ON language_code
+            """, analyzer);
+        assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [SORT emp_no]@2:3"));
+
+        err = error("""
+            FROM test
+            | LIMIT 2
+            | EVAL language_code = languages
+            | LOOKUP JOIN languages_lookup ON language_code
+            | ENRICH _remote:languages ON language_code
+            """, analyzer);
+        assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [LIMIT 2]@2:3"));
+
+        err = error("""
+            FROM test
+            | EVAL language_code = languages
+            | ENRICH _coordinator:languages_coord
+            | LOOKUP JOIN languages_lookup ON language_code
+            | ENRICH _remote:languages ON language_code
+            """, analyzer);
+        assertThat(
+            err,
+            containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [ENRICH _coordinator:languages_coord]@3:3")
+        );
+    }
 }