浏览代码

ESQL: Push down filter passed lookup join (#118410) (#118702)

Improve the planner to detect filters that can be pushed down 'through'
 a LOOKUP JOIN by determining the conditions scoped to the left/main
 side and moving them closer to the source.

Relates #118305
Costin Leau 10 月之前
父节点
当前提交
2cdefe09ac

+ 5 - 0
docs/changelog/118410.yaml

@@ -0,0 +1,5 @@
+pr: 118410
+summary: Push down filter passed lookup join
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 1
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/predicate/Predicates.java

@@ -61,7 +61,7 @@ public abstract class Predicates {
      *
      * using the given combiner.
      *
-     * While a bit longer, this method creates a balanced tree as oppose to a plain
+     * While a bit longer, this method creates a balanced tree as opposed to a plain
      * recursive approach which creates an unbalanced one (either to the left or right).
      */
     private static Expression combine(List<Expression> exps, BiFunction<Expression, Expression, Expression> combiner) {

+ 65 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

@@ -342,3 +342,68 @@ count:long | type:keyword
 3          | Success
 1          | Disconnected
 ;
+
+// 
+// Filtering tests
+// 
+
+lookupWithFilterOnLeftSideField
+required_capability: join_lookup_v5
+
+FROM employees
+| EVAL language_code = languages
+| LOOKUP JOIN languages_lookup ON language_code
+| SORT emp_no
+| KEEP emp_no, language_code, language_name
+| WHERE emp_no >= 10091 AND emp_no < 10094
+;
+
+emp_no:integer | language_code:integer | language_name:keyword
+10091          | 3                     | Spanish
+10092          | 1                     | English
+10093          | 3                     | Spanish
+;
+
+lookupMessageWithFilterOnRightSideField-Ignore
+required_capability: join_lookup_v5
+
+FROM sample_data
+| LOOKUP JOIN message_types_lookup ON message
+| WHERE type == "Error"
+| KEEP @timestamp, client_ip, event_duration, message, type
+| SORT @timestamp DESC
+;
+
+@timestamp:date          | client_ip:ip | event_duration:long | message:keyword       | type:keyword
+2023-10-23T13:53:55.832Z | 172.21.3.15  | 5033755             | Connection error      | Error
+2023-10-23T13:52:55.015Z | 172.21.3.15  | 8268153             | Connection error      | Error
+2023-10-23T13:51:54.732Z | 172.21.3.15  | 725448              | Connection error      | Error
+;
+
+lookupWithFieldAndRightSideAfterStats
+required_capability: join_lookup_v5
+
+FROM sample_data
+| LOOKUP JOIN message_types_lookup ON message
+| STATS count = count(message) BY type
+| WHERE type == "Error"
+;
+
+count:long | type:keyword
+3          | Error
+;
+
+lookupWithFieldOnJoinKey-Ignore
+required_capability: join_lookup_v5
+
+FROM employees
+| EVAL language_code = languages
+| LOOKUP JOIN languages_lookup ON language_code
+| WHERE language_code > 1 AND language_name IS NOT NULL
+| KEEP emp_no, language_code, language_name
+;
+
+emp_no:integer | language_code:integer | language_name:keyword
+10001          | 2                     | French
+10003          | 4                     | German
+;

+ 55 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java

@@ -15,6 +15,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.Expressions;
 import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
 import org.elasticsearch.xpack.esql.core.expression.predicate.Predicates;
+import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;
 import org.elasticsearch.xpack.esql.plan.logical.Eval;
 import org.elasticsearch.xpack.esql.plan.logical.Filter;
@@ -23,6 +24,8 @@ import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
 import org.elasticsearch.xpack.esql.plan.logical.Project;
 import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
 import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
+import org.elasticsearch.xpack.esql.plan.logical.join.Join;
+import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -76,11 +79,63 @@ public final class PushDownAndCombineFilters extends OptimizerRules.OptimizerRul
         } else if (child instanceof OrderBy orderBy) {
             // swap the filter with its child
             plan = orderBy.replaceChild(filter.with(orderBy.child(), condition));
+        } else if (child instanceof Join join) {
+            return pushDownPastJoin(filter, join);
         }
         // cannot push past a Limit, this could change the tailing result set returned
         return plan;
     }
 
+    private record ScopedFilter(List<Expression> commonFilters, List<Expression> leftFilters, List<Expression> rightFilters) {}
+
+    // split the filter condition in 3 parts:
+    // 1. filter scoped to the left
+    // 2. filter scoped to the right
+    // 3. filter that requires both sides to be evaluated
+    private static ScopedFilter scopeFilter(List<Expression> filters, LogicalPlan left, LogicalPlan right) {
+        List<Expression> rest = new ArrayList<>(filters);
+        List<Expression> leftFilters = new ArrayList<>();
+        List<Expression> rightFilters = new ArrayList<>();
+
+        AttributeSet leftOutput = left.outputSet();
+        AttributeSet rightOutput = right.outputSet();
+
+        // first remove things that are left scoped only
+        rest.removeIf(f -> f.references().subsetOf(leftOutput) && leftFilters.add(f));
+        // followed by right scoped only
+        rest.removeIf(f -> f.references().subsetOf(rightOutput) && rightFilters.add(f));
+        return new ScopedFilter(rest, leftFilters, rightFilters);
+    }
+
+    private static LogicalPlan pushDownPastJoin(Filter filter, Join join) {
+        LogicalPlan plan = filter;
+        // pushdown only through LEFT joins
+        // TODO: generalize this for other join types
+        if (join.config().type() == JoinTypes.LEFT) {
+            LogicalPlan left = join.left();
+            LogicalPlan right = join.right();
+
+            // split the filter condition in 3 parts:
+            // 1. filter scoped to the left
+            // 2. filter scoped to the right
+            // 3. filter that requires both sides to be evaluated
+            ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right);
+            // push the left scoped filter down to the left child, keep the rest intact
+            if (scoped.leftFilters.size() > 0) {
+                // push the filter down to the left child
+                left = new Filter(left.source(), left, Predicates.combineAnd(scoped.leftFilters));
+                // update the join with the new left child
+                join = (Join) join.replaceLeft(left);
+
+                // keep the remaining filters in place, otherwise return the new join;
+                Expression remainingFilter = Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters, scoped.rightFilters));
+                plan = remainingFilter != null ? filter.with(join, remainingFilter) : join;
+            }
+        }
+        // ignore the rest of the join
+        return plan;
+    }
+
     private static Function<Expression, Expression> NO_OP = expression -> expression;
 
     private static LogicalPlan maybePushDownPastUnary(

+ 243 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.esql.optimizer;
 
+import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.Build;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.lucene.BytesRefs;
@@ -217,6 +218,11 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         enrichResolution = new EnrichResolution();
         AnalyzerTestUtils.loadEnrichPolicyResolution(enrichResolution, "languages_idx", "id", "languages_idx", "mapping-languages.json");
 
+        var lookupMapping = loadMapping("mapping-languages.json");
+        IndexResolution lookupResolution = IndexResolution.valid(
+            new EsIndex("language_code", lookupMapping, Map.of("language_code", IndexMode.LOOKUP))
+        );
+
         // Most tests used data from the test index, so we load it here, and use it in the plan() function.
         mapping = loadMapping("mapping-basic.json");
         EsIndex test = new EsIndex("test", mapping, Map.of("test", IndexMode.STANDARD));
@@ -5740,7 +5746,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         String query = """
               FROM test
             | RENAME languages AS int
-            | LOOKUP int_number_names ON int""";
+            | LOOKUP_?? int_number_names ON int""";
         if (Build.current().isSnapshot() == false) {
             var e = expectThrows(ParsingException.class, () -> analyze(query));
             assertThat(e.getMessage(), containsString("line 3:3: mismatched input 'LOOKUP' expecting {"));
@@ -5820,7 +5826,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         String query = """
               FROM test
             | RENAME languages AS int
-            | LOOKUP int_number_names ON int
+            | LOOKUP_?? int_number_names ON int
             | STATS MIN(emp_no) BY name""";
         if (Build.current().isSnapshot() == false) {
             var e = expectThrows(ParsingException.class, () -> analyze(query));
@@ -5889,6 +5895,241 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         );
     }
 
+    //
+    // Lookup JOIN
+    //
+
+    /**
+     * Filter on join keys should be pushed down
+     * Expects
+     * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang
+     * uage_code{r}#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]]
+     *   \_Join[LEFT,[language_code{r}#4],[language_code{r}#4],[language_code{f}#18]]
+     *     |_EsqlProject[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang
+     * uages{f}#10 AS language_code, last_name{f}#11, long_noidx{f}#17, salary{f}#12]]
+     *     | \_Limit[1000[INTEGER]]
+     *     |  \_Filter[languages{f}#10 > 1[INTEGER]]
+     *     |    \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..]
+     *     \_EsRelation[language_code][LOOKUP][language_code{f}#18, language_name{f}#19]
+     */
+    public void testLookupJoinPushDownFilterOnJoinKeyWithRename() {
+        String query = """
+              FROM test
+            | RENAME languages AS language_code
+            | LOOKUP JOIN language_code ON language_code
+            | WHERE language_code > 1
+            """;
+        var plan = optimizedPlan(query);
+
+        var project = as(plan, Project.class);
+        var join = as(project.child(), Join.class);
+        assertThat(join.config().type(), equalTo(JoinTypes.LEFT));
+        project = as(join.left(), Project.class);
+        var limit = as(project.child(), Limit.class);
+        assertThat(limit.limit().fold(), equalTo(1000));
+        var filter = as(limit.child(), Filter.class);
+        // assert that the rename has been undone
+        var op = as(filter.condition(), GreaterThan.class);
+        var field = as(op.left(), FieldAttribute.class);
+        assertThat(field.name(), equalTo("languages"));
+
+        var literal = as(op.right(), Literal.class);
+        assertThat(literal.value(), equalTo(1));
+
+        var leftRel = as(filter.child(), EsRelation.class);
+        var rightRel = as(join.right(), EsRelation.class);
+    }
+
+    /**
+     * Filter on on left side fields (outside the join key) should be pushed down
+     * Expects
+     * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang
+     * uage_code{r}#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]]
+     *   \_Join[LEFT,[language_code{r}#4],[language_code{r}#4],[language_code{f}#18]]
+     *     |_EsqlProject[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang
+     * uages{f}#10 AS language_code, last_name{f}#11, long_noidx{f}#17, salary{f}#12]]
+     *     | \_Limit[1000[INTEGER]]
+     *     |  \_Filter[emp_no{f}#7 > 1[INTEGER]]
+     *     |    \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..]
+     *     \_EsRelation[language_code][LOOKUP][language_code{f}#18, language_name{f}#19]
+     */
+    public void testLookupJoinPushDownFilterOnLeftSideField() {
+        String query = """
+              FROM test
+            | RENAME languages AS language_code
+            | LOOKUP JOIN language_code ON language_code
+            | WHERE emp_no > 1
+            """;
+
+        var plan = optimizedPlan(query);
+
+        var project = as(plan, Project.class);
+        var join = as(project.child(), Join.class);
+        assertThat(join.config().type(), equalTo(JoinTypes.LEFT));
+        project = as(join.left(), Project.class);
+
+        var limit = as(project.child(), Limit.class);
+        assertThat(limit.limit().fold(), equalTo(1000));
+        var filter = as(limit.child(), Filter.class);
+        var op = as(filter.condition(), GreaterThan.class);
+        var field = as(op.left(), FieldAttribute.class);
+        assertThat(field.name(), equalTo("emp_no"));
+
+        var literal = as(op.right(), Literal.class);
+        assertThat(literal.value(), equalTo(1));
+
+        var leftRel = as(filter.child(), EsRelation.class);
+        var rightRel = as(join.right(), EsRelation.class);
+    }
+
+    /**
+     * Filter works on the right side fields and thus cannot be pushed down
+     * Expects
+     * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang
+     * uage_code{r}#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]]
+     * \_Limit[1000[INTEGER]]
+     *   \_Filter[language_name{f}#19 == [45 6e 67 6c 69 73 68][KEYWORD]]
+     *     \_Join[LEFT,[language_code{r}#4],[language_code{r}#4],[language_code{f}#18]]
+     *       |_EsqlProject[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang
+     * uages{f}#10 AS language_code, last_name{f}#11, long_noidx{f}#17, salary{f}#12]]
+     *       | \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..]
+     *       \_EsRelation[language_code][LOOKUP][language_code{f}#18, language_name{f}#19]
+     */
+    public void testLookupJoinPushDownDisabledForLookupField() {
+        String query = """
+              FROM test
+            | RENAME languages AS language_code
+            | LOOKUP JOIN language_code ON language_code
+            | WHERE language_name == "English"
+            """;
+
+        var plan = optimizedPlan(query);
+
+        var project = as(plan, Project.class);
+        var limit = as(project.child(), Limit.class);
+        assertThat(limit.limit().fold(), equalTo(1000));
+
+        var filter = as(limit.child(), Filter.class);
+        var op = as(filter.condition(), Equals.class);
+        var field = as(op.left(), FieldAttribute.class);
+        assertThat(field.name(), equalTo("language_name"));
+        var literal = as(op.right(), Literal.class);
+        assertThat(literal.value(), equalTo(new BytesRef("English")));
+
+        var join = as(filter.child(), Join.class);
+        assertThat(join.config().type(), equalTo(JoinTypes.LEFT));
+        project = as(join.left(), Project.class);
+
+        var leftRel = as(project.child(), EsRelation.class);
+        var rightRel = as(join.right(), EsRelation.class);
+    }
+
+    /**
+     * Split the conjunction into pushable and non pushable filters.
+     * Expects
+     * Project[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, lan
+     * guage_code{r}#4, last_name{f}#12, long_noidx{f}#18, salary{f}#13, language_name{f}#20]]
+     * \_Limit[1000[INTEGER]]
+     *   \_Filter[language_name{f}#20 == [45 6e 67 6c 69 73 68][KEYWORD]]
+     *     \_Join[LEFT,[language_code{r}#4],[language_code{r}#4],[language_code{f}#19]]
+     *       |_EsqlProject[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, lan
+     * guages{f}#11 AS language_code, last_name{f}#12, long_noidx{f}#18, salary{f}#13]]
+     *       | \_Filter[emp_no{f}#8 > 1[INTEGER]]
+     *       |   \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
+     *       \_EsRelation[language_code][LOOKUP][language_code{f}#19, language_name{f}#20]
+     */
+    public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightField() {
+        String query = """
+              FROM test
+            | RENAME languages AS language_code
+            | LOOKUP JOIN language_code ON language_code
+            | WHERE language_name == "English" AND emp_no > 1
+            """;
+
+        var plan = optimizedPlan(query);
+
+        var project = as(plan, Project.class);
+        var limit = as(project.child(), Limit.class);
+        assertThat(limit.limit().fold(), equalTo(1000));
+        // filter kept in place, working on the right side
+        var filter = as(limit.child(), Filter.class);
+        EsqlBinaryComparison op = as(filter.condition(), Equals.class);
+        var field = as(op.left(), FieldAttribute.class);
+        assertThat(field.name(), equalTo("language_name"));
+        var literal = as(op.right(), Literal.class);
+        assertThat(literal.value(), equalTo(new BytesRef("English")));
+
+        var join = as(filter.child(), Join.class);
+        assertThat(join.config().type(), equalTo(JoinTypes.LEFT));
+        project = as(join.left(), Project.class);
+        // filter pushed down
+        filter = as(project.child(), Filter.class);
+        op = as(filter.condition(), GreaterThan.class);
+        field = as(op.left(), FieldAttribute.class);
+        assertThat(field.name(), equalTo("emp_no"));
+
+        literal = as(op.right(), Literal.class);
+        assertThat(literal.value(), equalTo(1));
+
+        var leftRel = as(filter.child(), EsRelation.class);
+        var rightRel = as(join.right(), EsRelation.class);
+
+    }
+
+    /**
+     * Disjunctions however keep the filter in place, even on pushable fields
+     * Expects
+     * Project[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, lan
+     * guage_code{r}#4, last_name{f}#12, long_noidx{f}#18, salary{f}#13, language_name{f}#20]]
+     * \_Limit[1000[INTEGER]]
+     *   \_Filter[language_name{f}#20 == [45 6e 67 6c 69 73 68][KEYWORD] OR emp_no{f}#8 > 1[INTEGER]]
+     *     \_Join[LEFT,[language_code{r}#4],[language_code{r}#4],[language_code{f}#19]]
+     *       |_EsqlProject[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, lan
+     * guages{f}#11 AS language_code, last_name{f}#12, long_noidx{f}#18, salary{f}#13]]
+     *       | \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
+     *       \_EsRelation[language_code][LOOKUP][language_code{f}#19, language_name{f}#20]
+     */
+    public void testLookupJoinPushDownDisabledForDisjunctionBetweenLeftAndRightField() {
+        String query = """
+              FROM test
+            | RENAME languages AS language_code
+            | LOOKUP JOIN language_code ON language_code
+            | WHERE language_name == "English" OR emp_no > 1
+            """;
+
+        var plan = optimizedPlan(query);
+
+        var project = as(plan, Project.class);
+        var limit = as(project.child(), Limit.class);
+        assertThat(limit.limit().fold(), equalTo(1000));
+
+        var filter = as(limit.child(), Filter.class);
+        var or = as(filter.condition(), Or.class);
+        EsqlBinaryComparison op = as(or.left(), Equals.class);
+        // OR left side
+        var field = as(op.left(), FieldAttribute.class);
+        assertThat(field.name(), equalTo("language_name"));
+        var literal = as(op.right(), Literal.class);
+        assertThat(literal.value(), equalTo(new BytesRef("English")));
+        // OR right side
+        op = as(or.right(), GreaterThan.class);
+        field = as(op.left(), FieldAttribute.class);
+        assertThat(field.name(), equalTo("emp_no"));
+        literal = as(op.right(), Literal.class);
+        assertThat(literal.value(), equalTo(1));
+
+        var join = as(filter.child(), Join.class);
+        assertThat(join.config().type(), equalTo(JoinTypes.LEFT));
+        project = as(join.left(), Project.class);
+
+        var leftRel = as(project.child(), EsRelation.class);
+        var rightRel = as(join.right(), EsRelation.class);
+    }
+
+    //
+    //
+    //
+
     public void testTranslateMetricsWithoutGrouping() {
         assumeTrue("requires snapshot builds", Build.current().isSnapshot());
         var query = "METRICS k8s max(rate(network.total_bytes_in))";