Jelajahi Sumber

Fix field resolution for FORK (#128193)

Ioana Tagirta 5 bulan lalu
induk
melakukan
c8581b0679

+ 35 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -571,15 +571,11 @@ public class EsqlSession {
     }
 
     static PreAnalysisResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMatchFields, PreAnalysisResult result) {
-        if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) {
+        if (false == parsed.anyMatch(EsqlSession::shouldCollectReferencedFields)) {
             // no explicit columns selection, for example "from employees"
             return result.withFieldNames(IndexResolver.ALL_FIELDS);
         }
 
-        if (parsed.anyMatch(plan -> plan instanceof Fork)) {
-            return result.withFieldNames(IndexResolver.ALL_FIELDS);
-        }
-
         Holder<Boolean> projectAll = new Holder<>(false);
         parsed.forEachExpressionDown(UnresolvedStar.class, us -> {// explicit "*" fields selection
             if (projectAll.get()) {
@@ -587,6 +583,33 @@ public class EsqlSession {
             }
             projectAll.set(true);
         });
+
+        if (projectAll.get()) {
+            return result.withFieldNames(IndexResolver.ALL_FIELDS);
+        }
+
+        Holder<Boolean> projectAfterFork = new Holder<>(false);
+        Holder<Boolean> hasFork = new Holder<>(false);
+
+        parsed.forEachDown(plan -> {
+            if (projectAll.get()) {
+                return;
+            }
+
+            if (hasFork.get() == false && shouldCollectReferencedFields(plan)) {
+                projectAfterFork.set(true);
+            }
+
+            if (plan instanceof Fork fork && projectAfterFork.get() == false) {
+                hasFork.set(true);
+                fork.children().forEach(child -> {
+                    if (child.anyMatch(EsqlSession::shouldCollectReferencedFields) == false) {
+                        projectAll.set(true);
+                    }
+                });
+            }
+        });
+
         if (projectAll.get()) {
             return result.withFieldNames(IndexResolver.ALL_FIELDS);
         }
@@ -703,6 +726,13 @@ public class EsqlSession {
         }
     }
 
+    /**
+     * Indicates whether the given plan gives an exact list of fields that we need to collect from field_caps.
+     */
+    private static boolean shouldCollectReferencedFields(LogicalPlan plan) {
+        return plan instanceof Aggregate || plan instanceof Project;
+    }
+
     /**
      * Could a plan "accidentally" override aliases?
      * Examples are JOIN and ENRICH, that _could_ produce fields with the same

+ 115 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java

@@ -1863,6 +1863,121 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
         );
     }
 
+    public void testForkFieldsWithKeepAfterFork() {
+        assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());
+
+        assertFieldNames("""
+            FROM test
+            | WHERE a > 2000
+            | EVAL b = a + 100
+            | FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
+                   (WHERE d > 1000 AND e == "aaa" | EVAL c = a + 200)
+            | WHERE x > y
+            | KEEP a, b, c, d, x
+            """, Set.of("a", "a.*", "c", "c.*", "d", "d.*", "e", "e.*", "x", "x.*", "y", "y.*"));
+    }
+
+    public void testForkFieldsWithKeepBeforeFork() {
+        assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());
+
+        assertFieldNames("""
+            FROM test
+            | KEEP a, b, c, d, x
+            | WHERE a > 2000
+            | EVAL b = a + 100
+            | FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
+                   (WHERE d > 1000 AND e == "aaa" | EVAL c = a + 200)
+            | WHERE x > y
+            """, Set.of("a", "a.*", "b", "b.*", "c", "c.*", "d", "d.*", "e", "e.*", "x", "x.*", "y", "y.*"));
+    }
+
+    public void testForkFieldsWithNoProjection() {
+        assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());
+
+        assertFieldNames("""
+            FROM test
+            | WHERE a > 2000
+            | EVAL b = a + 100
+            | FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
+                   (WHERE d > 1000 AND e == "aaa" | EVAL c = a + 200)
+            | WHERE x > y
+            """, ALL_FIELDS);
+    }
+
+    public void testForkFieldsWithStatsInOneBranch() {
+        assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());
+
+        assertFieldNames("""
+            FROM test
+            | WHERE a > 2000
+            | EVAL b = a + 100
+            | FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
+                   (STATS x = count(*), y=min(z))
+            | WHERE x > y
+            """, ALL_FIELDS);
+    }
+
+    public void testForkFieldsWithEnrichAndLookupJoins() {
+        assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());
+        assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled());
+
+        assertFieldNames(
+            """
+                FROM test
+                | KEEP a, b, abc, def, z, xyz
+                | ENRICH enrich_policy ON abc
+                | EVAL b = a + 100
+                | LOOKUP JOIN my_lookup_index ON def
+                | FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
+                       (STATS x = count(*), y=min(z))
+                | LOOKUP JOIN my_lookup_index ON xyz
+                | WHERE x > y OR _fork == "fork1"
+                """,
+            Set.of(
+                "x",
+                "y",
+                "_fork",
+                "a",
+                "c",
+                "abc",
+                "b",
+                "def",
+                "z",
+                "xyz",
+                "def.*",
+                "_fork.*",
+                "y.*",
+                "x.*",
+                "xyz.*",
+                "z.*",
+                "abc.*",
+                "a.*",
+                "c.*",
+                "b.*"
+            )
+        );
+    }
+
+    public void testForkWithStatsInAllBranches() {
+        assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());
+
+        assertFieldNames("""
+            FROM test
+            | WHERE a > 2000
+            | EVAL b = a + 100
+            | FORK (WHERE c > 1 AND a < 10000 | STATS m = count(*))
+                   (EVAL z = a * b | STATS m = max(z))
+                   (STATS x = count(*), y=min(z))
+            | WHERE x > y
+            """, Set.of("a", "a.*", "b", "b.*", "c", "c.*", "z", "z.*"));
+    }
+
+    public void testForkWithStatsAndWhere() {
+        assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());
+
+        assertFieldNames(" FROM employees | FORK ( WHERE true | stats min(salary) by gender) ( WHERE true | LIMIT 3 )", ALL_FIELDS);
+    }
+
     private Set<String> fieldNames(String query, Set<String> enrichPolicyMatchFields) {
         var preAnalysisResult = new EsqlSession.PreAnalysisResult(null);
         return EsqlSession.fieldNames(parser.createStatement(query), enrichPolicyMatchFields, preAnalysisResult).fieldNames();