Browse Source

Fixing remote ENRICH by pushing the Enrich inside FragmentExec (#114665) (#115591)

* Fixing remote ENRICH by pushing the Enrich inside FragmentExec
* Improve handling of more complex cases such as several enriches

(cherry picked from commit e789039dfa8fee60dc2615c3876295ff7c6f3b01)
Stanislav Malyshev 11 months ago
parent
commit
a614517120

+ 6 - 0
docs/changelog/114665.yaml

@@ -0,0 +1,6 @@
+pr: 114665
+summary: Fixing remote ENRICH by pushing the Enrich inside `FragmentExec`
+area: ES|QL
+type: bug
+issues:
+  - 105095

+ 94 - 8
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java

@@ -47,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -469,27 +470,112 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
         }
     }
 
+    public void testEnrichRemoteWithVendorNoSort() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
+        for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) {
+            var query = String.format(Locale.ROOT, """
+                FROM *:events,events
+                | LIMIT 100
+                | eval ip= TO_STR(host)
+                | %s
+                | %s
+                | stats c = COUNT(*) by vendor
+                """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE));
+            try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
+                var values = getValuesList(resp);
+                values.sort(Comparator.comparing(o -> (String) o.get(1), Comparator.nullsLast(Comparator.naturalOrder())));
+                assertThat(
+                    values,
+                    equalTo(
+                        List.of(
+                            List.of(6L, "Apple"),
+                            List.of(7L, "Microsoft"),
+                            List.of(1L, "Redhat"),
+                            List.of(2L, "Samsung"),
+                            List.of(1L, "Sony"),
+                            List.of(2L, "Suse"),
+                            Arrays.asList(3L, (String) null)
+                        )
+                    )
+                );
+                EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+                assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+                assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
+                assertCCSExecutionInfoDetails(executionInfo);
+            }
+        }
+    }
+
     public void testTopNThenEnrichRemote() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
         String query = String.format(Locale.ROOT, """
             FROM *:events,events
             | eval ip= TO_STR(host)
-            | SORT ip
+            | SORT timestamp, user, ip
             | LIMIT 5
-            | %s
+            | %s | KEEP host, timestamp, user, os
             """, enrichHosts(Enrich.Mode.REMOTE));
-        var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
-        assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
+        try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
+            assertThat(
+                getValuesList(resp),
+                equalTo(
+                    List.of(
+                        List.of("192.168.1.2", 1L, "andres", "Windows"),
+                        List.of("192.168.1.3", 1L, "matthew", "MacOS"),
+                        Arrays.asList("192.168.1.25", 1L, "park", (String) null),
+                        List.of("192.168.1.5", 2L, "akio", "Android"),
+                        List.of("192.168.1.6", 2L, "sergio", "iOS")
+                    )
+                )
+            );
+            EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+            assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
+            assertCCSExecutionInfoDetails(executionInfo);
+        }
     }
 
     public void testLimitThenEnrichRemote() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
         String query = String.format(Locale.ROOT, """
             FROM *:events,events
-            | LIMIT 10
+            | LIMIT 25
             | eval ip= TO_STR(host)
-            | %s
+            | %s | KEEP host, timestamp, user, os
             """, enrichHosts(Enrich.Mode.REMOTE));
-        var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
-        assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
+        try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
+            var values = getValuesList(resp);
+            values.sort(
+                Comparator.comparingLong((List<Object> o) -> (Long) o.get(1))
+                    .thenComparing(o -> (String) o.get(0))
+                    .thenComparing(o -> (String) o.get(2))
+            );
+            assertThat(
+                values.subList(0, 5),
+                equalTo(
+                    List.of(
+                        List.of("192.168.1.2", 1L, "andres", "Windows"),
+                        Arrays.asList("192.168.1.25", 1L, "park", (String) null),
+                        List.of("192.168.1.3", 1L, "matthew", "MacOS"),
+                        List.of("192.168.1.5", 2L, "akio", "Android"),
+                        List.of("192.168.1.5", 2L, "simon", "Android")
+                    )
+                )
+            );
+            EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+            assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
+            assertCCSExecutionInfoDetails(executionInfo);
+        }
     }
 
     public void testAggThenEnrichRemote() {

+ 0 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java

@@ -609,22 +609,15 @@ public class Verifier {
      */
     private static void checkRemoteEnrich(LogicalPlan plan, Set<Failure> failures) {
         boolean[] agg = { false };
-        boolean[] limit = { false };
         boolean[] enrichCoord = { false };
 
         plan.forEachUp(UnaryPlan.class, u -> {
-            if (u instanceof Limit) {
-                limit[0] = true; // TODO: Make Limit then enrich_remote work
-            }
             if (u instanceof Aggregate) {
                 agg[0] = true;
             } else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
                 enrichCoord[0] = true;
             }
             if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
-                if (limit[0]) {
-                    failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LIMIT"));
-                }
                 if (agg[0]) {
                     failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
                 }

+ 42 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java

@@ -52,8 +52,10 @@ import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
 import org.elasticsearch.xpack.esql.plan.physical.RowExec;
 import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
 import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
+import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * <p>This class is part of the planner</p>
@@ -104,6 +106,46 @@ public class Mapper {
         //
         // Unary Plan
         //
+        if (localMode == false && p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
+            // When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely.
+            // We're only going to do it on the coordinator node.
+            // The way we're going to do it is as follows:
+            // 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything.
+            // 2. Put this Enrich under it, removing everything that was below it previously.
+            // 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under
+            // FragmentExec.
+            // 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich.
+            // 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
+
+            var child = map(enrich.child());
+            AtomicBoolean hasFragment = new AtomicBoolean(false);
+
+            var childTransformed = child.transformUp((f) -> {
+                // Once we reached FragmentExec, we stuff our Enrich under it
+                if (f instanceof FragmentExec) {
+                    hasFragment.set(true);
+                    return new FragmentExec(p);
+                }
+                if (f instanceof EnrichExec enrichExec) {
+                    // It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec
+                    assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here";
+                    return enrichExec.child();
+                }
+                if (f instanceof UnaryExec unaryExec) {
+                    if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) {
+                        return f;
+                    } else {
+                        return unaryExec.child();
+                    }
+                }
+                // Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
+                return f;
+            });
+
+            if (hasFragment.get()) {
+                return childTransformed;
+            }
+        }
 
         if (p instanceof UnaryPlan ua) {
             var child = map(ua.child());

+ 53 - 10
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

@@ -172,7 +172,7 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.startsWith;
 
-// @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug")
+// @TestLogging(value = "org.elasticsearch.xpack.esql:DEBUG", reason = "debug")
 public class PhysicalPlanOptimizerTests extends ESTestCase {
 
     private static final String PARAM_FORMATTING = "%1$s";
@@ -5851,14 +5851,14 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
                 | EVAL employee_id = to_str(emp_no)
                 | ENRICH _remote:departments
                 | LIMIT 10""");
-            var enrich = as(plan, EnrichExec.class);
-            assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
-            assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
-            var eval = as(enrich.child(), EvalExec.class);
-            var finalLimit = as(eval.child(), LimitExec.class);
+            var finalLimit = as(plan, LimitExec.class);
             var exchange = as(finalLimit.child(), ExchangeExec.class);
             var fragment = as(exchange.child(), FragmentExec.class);
-            var partialLimit = as(fragment.fragment(), Limit.class);
+            var enrich = as(fragment.fragment(), Enrich.class);
+            assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
+            assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
+            var evalFragment = as(enrich.child(), Eval.class);
+            var partialLimit = as(evalFragment.child(), Limit.class);
             as(partialLimit.child(), EsRelation.class);
         }
     }
@@ -5901,13 +5901,21 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
     }
 
     public void testLimitThenEnrichRemote() {
-        var error = expectThrows(VerificationException.class, () -> physicalPlan("""
+        var plan = physicalPlan("""
             FROM test
             | LIMIT 10
             | EVAL employee_id = to_str(emp_no)
             | ENRICH _remote:departments
-            """));
-        assertThat(error.getMessage(), containsString("line 4:3: ENRICH with remote policy can't be executed after LIMIT"));
+            """);
+        var finalLimit = as(plan, LimitExec.class);
+        var exchange = as(finalLimit.child(), ExchangeExec.class);
+        var fragment = as(exchange.child(), FragmentExec.class);
+        var enrich = as(fragment.fragment(), Enrich.class);
+        assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
+        assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
+        var evalFragment = as(enrich.child(), Eval.class);
+        var partialLimit = as(evalFragment.child(), Limit.class);
+        as(partialLimit.child(), EsRelation.class);
     }
 
     public void testEnrichBeforeTopN() {
@@ -5961,6 +5969,23 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             var eval = as(enrich.child(), Eval.class);
             as(eval.child(), EsRelation.class);
         }
+        {
+            var plan = physicalPlan("""
+                FROM test
+                | EVAL employee_id = to_str(emp_no)
+                | ENRICH _remote:departments
+                | SORT department
+                | LIMIT 10""");
+            var topN = as(plan, TopNExec.class);
+            var exchange = as(topN.child(), ExchangeExec.class);
+            var fragment = as(exchange.child(), FragmentExec.class);
+            var partialTopN = as(fragment.fragment(), TopN.class);
+            var enrich = as(partialTopN.child(), Enrich.class);
+            assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
+            assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
+            var eval = as(enrich.child(), Eval.class);
+            as(eval.child(), EsRelation.class);
+        }
     }
 
     public void testEnrichAfterTopN() {
@@ -6000,6 +6025,24 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             var partialTopN = as(fragment.fragment(), TopN.class);
             as(partialTopN.child(), EsRelation.class);
         }
+        {
+            var plan = physicalPlan("""
+                FROM test
+                | SORT emp_no
+                | LIMIT 10
+                | EVAL employee_id = to_str(emp_no)
+                | ENRICH _remote:departments
+                """);
+            var topN = as(plan, TopNExec.class);
+            var exchange = as(topN.child(), ExchangeExec.class);
+            var fragment = as(exchange.child(), FragmentExec.class);
+            var enrich = as(fragment.fragment(), Enrich.class);
+            assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
+            assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
+            var evalFragment = as(enrich.child(), Eval.class);
+            var partialTopN = as(evalFragment.child(), TopN.class);
+            as(partialTopN.child(), EsRelation.class);
+        }
     }
 
     public void testManyEnrich() {