Browse Source

ES|QL: Fix ProjectAwayColumns for FORK (#128839)

Ioana Tagirta 4 months ago
parent
commit
ca904ce05d

+ 2 - 2
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

@@ -46,7 +46,7 @@ import static org.elasticsearch.xpack.esql.CsvSpecReader.specParser;
 import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
 import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
-import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V5;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V6;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V7;
@@ -132,7 +132,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
         assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
         // Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
         assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
-        assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V5.capabilityName()));
+        assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V6.capabilityName()));
     }
 
     @Override

+ 54 - 12
x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec

@@ -3,7 +3,7 @@
 //
 
 simpleFork
-required_capability: fork_v5
+required_capability: fork_v6
 
 FROM employees
 | FORK ( WHERE emp_no == 10001 )
@@ -18,7 +18,7 @@ emp_no:integer | _fork:keyword
 ;
 
 forkWithWhereSortAndLimit
-required_capability: fork_v5
+required_capability: fork_v6
 
 FROM employees
 | FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
@@ -38,7 +38,7 @@ emp_no:integer | first_name:keyword | _fork:keyword
 ;
 
 fiveFork
-required_capability: fork_v5
+required_capability: fork_v6
 
 FROM employees
 | FORK ( WHERE emp_no == 10005 )
@@ -59,7 +59,7 @@ fork5         | 10001
 ;
 
 forkWithWhereSortDescAndLimit
-required_capability: fork_v5
+required_capability: fork_v6
 
 FROM employees
 | FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 )
@@ -76,7 +76,7 @@ fork2         | 10087          | Xinglin
 ;
 
 forkWithCommonPrefilter
-required_capability: fork_v5
+required_capability: fork_v6
 
 FROM employees
 | WHERE emp_no > 10050
@@ -94,7 +94,7 @@ fork2         | 10100
 ;
 
 forkWithSemanticSearchAndScore
-required_capability: fork_v5
+required_capability: fork_v6
 required_capability: semantic_text_field_caps
 required_capability: metadata_score
 
@@ -114,7 +114,7 @@ fork2         | 6.093784261960139E18  | 2           | all we have to decide is w
 ;
 
 forkWithEvals
-required_capability: fork_v5
+required_capability: fork_v6
 
 FROM employees
 | FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1)
@@ -131,7 +131,7 @@ fork2         | 10087          | def       | null      | 2
 ;
 
 forkWithStats
-required_capability: fork_v5
+required_capability: fork_v6
 
 FROM employees
 | FORK (WHERE emp_no == 10048 OR emp_no == 10081)
@@ -152,7 +152,7 @@ fork4         | null           | 100    | 10001     | null
 ;
 
 forkWithDissect
-required_capability: fork_v5
+required_capability: fork_v6
 
 FROM employees
 | WHERE emp_no == 10048 OR emp_no == 10081
@@ -172,7 +172,7 @@ fork2         | 10081          | Rosen     | 10081     | null      | Zhongwei
 ;
 
 forkWithMixOfCommands
-required_capability: fork_v5
+required_capability: fork_v6
 
 FROM employees
 | WHERE emp_no == 10048 OR emp_no == 10081
@@ -197,7 +197,7 @@ fork4         | 10081          | abc       | aaa       | null      | null
 ;
 
 forkWithFiltersOnConstantValues
-required_capability: fork_v5
+required_capability: fork_v6
 
 FROM employees
 | EVAL z = 1
@@ -218,7 +218,7 @@ fork3         | null           | 100    | 10100     | 10001
 ;
 
 forkWithUnsupportedAttributes
-required_capability: fork_v5
+required_capability: fork_v6
 
 FROM heights
 | FORK (SORT description DESC | LIMIT 1 | EVAL x = length(description) )
@@ -230,3 +230,45 @@ description:keyword | height_range:unsupported | x:integer | _fork:keyword
 Very Tall           | null                     | 9         | fork1
 Medium Height       | null                     | null      | fork2
 ;
+
+forkAfterLookupJoin
+required_capability: fork_v6
+
+FROM employees
+| EVAL language_code = languages
+| LOOKUP JOIN languages_lookup ON language_code
+| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
+       (WHERE emp_no == 10081 OR emp_no == 10087)
+       (WHERE emp_no == 10081 | EVAL language_name = "Klingon")
+| KEEP _fork, emp_no, language_code, language_name
+| SORT _fork, emp_no
+;
+
+_fork:keyword | emp_no:integer | language_code:integer | language_name:keyword
+fork1         | 10048          | 3                     | Spanish
+fork1         | 10081          | 2                     | French
+fork2         | 10081          | 2                     | French
+fork2         | 10087          | 5                     | null
+fork3         | 10081          | 2                     | Klingon
+;
+
+forkBeforeLookupJoin
+required_capability: fork_v6
+
+FROM employees
+| EVAL language_code = languages
+| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
+       (WHERE emp_no == 10081 OR emp_no == 10087)
+       (WHERE emp_no == 10081 | EVAL language_name = "Klingon")
+| LOOKUP JOIN languages_lookup ON language_code
+| KEEP _fork, emp_no, language_code, language_name
+| SORT _fork, emp_no
+;
+
+_fork:keyword | emp_no:integer | language_code:integer | language_name:keyword
+fork1         | 10048          | 3                     | Spanish
+fork1         | 10081          | 2                     | French
+fork2         | 10081          | 2                     | French
+fork2         | 10087          | 5                     | null
+fork3         | 10081          | 2                     | French
+;

+ 59 - 2
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java

@@ -633,6 +633,46 @@ public class ForkIT extends AbstractEsqlIntegTestCase {
         }
     }
 
+    public void testWithLookUpJoinBeforeFork() {
+        var query = """
+                FROM test
+                | LOOKUP JOIN test-lookup ON id
+                | FORK (WHERE id == 2 OR id == 3)
+                       (WHERE id == 1 OR id == 2)
+                | SORT _fork, id
+            """;
+        try (var resp = run(query)) {
+            assertColumnNames(resp.columns(), List.of("content", "id", "animal", "_fork"));
+            Iterable<Iterable<Object>> expectedValues = List.of(
+                List.of("This is a brown dog", 2, "dog", "fork1"),
+                List.of("This dog is really brown", 3, "dog", "fork1"),
+                List.of("This is a brown fox", 1, "fox", "fork2"),
+                List.of("This is a brown dog", 2, "dog", "fork2")
+            );
+            assertValues(resp.values(), expectedValues);
+        }
+    }
+
+    public void testWithLookUpAfterFork() {
+        var query = """
+                FROM test
+                | FORK (WHERE id == 2 OR id == 3)
+                       (WHERE id == 1 OR id == 2)
+                | LOOKUP JOIN test-lookup ON id
+                | SORT _fork, id
+            """;
+        try (var resp = run(query)) {
+            assertColumnNames(resp.columns(), List.of("content", "id", "_fork", "animal"));
+            Iterable<Iterable<Object>> expectedValues = List.of(
+                List.of("This is a brown dog", 2, "fork1", "dog"),
+                List.of("This dog is really brown", 3, "fork1", "dog"),
+                List.of("This is a brown fox", 1, "fork2", "fox"),
+                List.of("This is a brown dog", 2, "fork2", "dog")
+            );
+            assertValues(resp.values(), expectedValues);
+        }
+    }
+
     public void testWithEvalWithConflictingTypes() {
         var query = """
                 FROM test
@@ -763,10 +803,10 @@ public class ForkIT extends AbstractEsqlIntegTestCase {
     private void createAndPopulateIndex() {
         var indexName = "test";
         var client = client().admin().indices();
-        var CreateRequest = client.prepareCreate(indexName)
+        var createRequest = client.prepareCreate(indexName)
             .setSettings(Settings.builder().put("index.number_of_shards", 1))
             .setMapping("id", "type=integer", "content", "type=text");
-        assertAcked(CreateRequest);
+        assertAcked(createRequest);
         client().prepareBulk()
             .add(new IndexRequest(indexName).id("1").source("id", 1, "content", "This is a brown fox"))
             .add(new IndexRequest(indexName).id("2").source("id", 2, "content", "This is a brown dog"))
@@ -777,6 +817,23 @@ public class ForkIT extends AbstractEsqlIntegTestCase {
             .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
             .get();
         ensureYellow(indexName);
+
+        var lookupIndex = "test-lookup";
+        createRequest = client.prepareCreate(lookupIndex)
+            .setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.mode", "lookup"))
+            .setMapping("id", "type=integer", "animal", "type=keyword");
+        assertAcked(createRequest);
+
+        client().prepareBulk()
+            .add(new IndexRequest(lookupIndex).id("1").source("id", 1, "animal", "fox"))
+            .add(new IndexRequest(lookupIndex).id("2").source("id", 2, "animal", "dog"))
+            .add(new IndexRequest(lookupIndex).id("3").source("id", 3, "animal", "dog"))
+            .add(new IndexRequest(lookupIndex).id("4").source("id", 4, "animal", "dog"))
+            .add(new IndexRequest(lookupIndex).id("5").source("id", 5, "animal", "cat"))
+            .add(new IndexRequest(lookupIndex).id("6").source("id", 6, "animal", List.of("fox", "dog")))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+        ensureYellow(lookupIndex);
     }
 
     static Iterator<Iterator<Object>> valuesFilter(Iterator<Iterator<Object>> values, Predicate<Iterator<Object>> filter) {

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -1024,7 +1024,7 @@ public class EsqlCapabilities {
         /**
          * Support streaming of sub plan results
          */
-        FORK_V5(Build.current().isSnapshot()),
+        FORK_V6(Build.current().isSnapshot()),
 
         /**
          * Support for the {@code leading_zeros} named parameter.

+ 19 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java

@@ -44,15 +44,29 @@ public class ProjectAwayColumns extends Rule<PhysicalPlan, PhysicalPlan> {
         // and the overall output will not change.
         AttributeSet.Builder requiredAttrBuilder = plan.outputSet().asBuilder();
 
-        // This will require updating should we choose to have non-unary execution plans in the future.
         return plan.transformDown(currentPlanNode -> {
-            if (currentPlanNode instanceof MergeExec) {
-                keepTraversing.set(FALSE);
-            }
-
             if (keepTraversing.get() == false) {
                 return currentPlanNode;
             }
+
+            // for non-unary execution plans, we apply the rule for each child
+            if (currentPlanNode instanceof MergeExec mergeExec) {
+                keepTraversing.set(FALSE);
+                List<PhysicalPlan> newChildren = new ArrayList<>();
+                boolean changed = false;
+
+                for (var child : mergeExec.children()) {
+                    var newChild = apply(child);
+
+                    if (newChild != child) {
+                        changed = true;
+                    }
+
+                    newChildren.add(newChild);
+                }
+                return changed ? new MergeExec(mergeExec.source(), newChildren, mergeExec.output()) : mergeExec;
+            }
+
             if (currentPlanNode instanceof ExchangeExec exec) {
                 keepTraversing.set(FALSE);
                 var child = exec.child();

+ 6 - 11
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

@@ -38,7 +38,6 @@ import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdow
 import org.elasticsearch.xpack.esql.plan.QueryPlan;
 import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
 import org.elasticsearch.xpack.esql.plan.logical.Filter;
-import org.elasticsearch.xpack.esql.plan.logical.Project;
 import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
 import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
@@ -83,16 +82,12 @@ public class PlannerUtils {
     public static Tuple<List<PhysicalPlan>, PhysicalPlan> breakPlanIntoSubPlansAndMainPlan(PhysicalPlan plan) {
         var subplans = new Holder<List<PhysicalPlan>>();
         PhysicalPlan mainPlan = plan.transformUp(MergeExec.class, me -> {
-            subplans.set(me.children().stream().map(child -> {
-                // TODO: we are adding a Project plan to force InsertFieldExtraction - we should remove this transformation
-                child = child.transformUp(FragmentExec.class, f -> {
-                    var logicalFragment = f.fragment();
-                    logicalFragment = new Project(logicalFragment.source(), logicalFragment, logicalFragment.output());
-                    return new FragmentExec(logicalFragment);
-                });
-
-                return (PhysicalPlan) new ExchangeSinkExec(child.source(), child.output(), false, child);
-            }).toList());
+            subplans.set(
+                me.children()
+                    .stream()
+                    .map(child -> (PhysicalPlan) new ExchangeSinkExec(child.source(), child.output(), false, child))
+                    .toList()
+            );
             return new ExchangeSourceExec(me.source(), me.output(), false);
         });
 

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -310,7 +310,7 @@ public class CsvTests extends ESTestCase {
             );
             assumeFalse(
                 "CSV tests cannot currently handle FORK",
-                testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V5.capabilityName())
+                testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V6.capabilityName())
             );
             assumeFalse(
                 "CSV tests cannot currently handle multi_match function that depends on Lucene",