Browse Source

ESQL: Add optimization to purge join on null merge key (#127583) (#128733)

This adds a new logical optimization rule to purge a Join in case the
merge key(s) are null. The null detection is based on recognizing a tree
pattern where the join sits atop a project and/or eval (possibly a few
nodes deep) which contains a reference to a `null`, reference which
matches the join key.

It works at coordinator planning level, but it's most useful locally,
after insertions of `nulls` in the plan on detecting missing fields.

The Join is substituted with a projection with the same attributes as
the join, atop an eval with all join's right fields aliased to null.

Closes #125577.

(cherry picked from commit 21fe40a9b5e73ca953e93971089116a535ab903c)
Bogdan Pintea 4 months ago
parent
commit
fd206a4772

+ 6 - 0
docs/changelog/127583.yaml

@@ -0,0 +1,6 @@
+pr: 127583
+summary: Add optimization to purge join on null merge key
+area: ES|QL
+type: enhancement
+issues:
+ - 125577

+ 1 - 1
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Alias.java

@@ -138,7 +138,7 @@ public final class Alias extends NamedExpression {
 
     @Override
     public String nodeString() {
-        return child.nodeString() + " AS " + name();
+        return child.nodeString() + " AS " + name() + "#" + id();
     }
 
     /**

+ 19 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

@@ -1636,3 +1636,22 @@ max:long
 3450233
 8268153
 ;
+
+nullifiedJoinKeyToPurgeTheJoin
+required_capability: join_lookup_v12
+
+FROM employees
+| RENAME languages AS language_code
+| SORT emp_no, language_code
+| LIMIT 4
+| EVAL language_code = TO_INTEGER(NULL)
+| LOOKUP JOIN languages_lookup ON language_code
+| KEEP emp_no, language_code, language_name
+;
+
+emp_no:integer | language_code:integer | language_name:keyword
+10001          |null           |null
+10002          |null           |null
+10003          |null           |null
+10004          |null           |null
+;

+ 3 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

@@ -60,6 +60,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSpatialSur
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogatePlans;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogates;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateMetricsAggregate;
+import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.PruneLeftJoinOnNullMatchingField;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
 
@@ -188,7 +189,8 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan,
             new PushDownEnrich(),
             new PushDownAndCombineOrderBy(),
             new PruneRedundantOrderBy(),
-            new PruneRedundantSortClauses()
+            new PruneRedundantSortClauses(),
+            new PruneLeftJoinOnNullMatchingField()
         );
     }
 

+ 104 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RuleUtils.java

@@ -0,0 +1,104 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.optimizer.rules;
+
+import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.xpack.esql.core.expression.Alias;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.expression.Literal;
+import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
+import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+public final class RuleUtils {
+
+    private RuleUtils() {}
+
+    /**
+     * Returns a tuple of two lists:
+     * 1. A list of aliases to null literals for those data types in the {@param outputAttributes} that {@param shouldBeReplaced}.
+     * 2. A list of named expressions where attributes that match the predicate are replaced with their corresponding null alias.
+     *
+     * @param outputAttributes The original output attributes.
+     * @param shouldBeReplaced A predicate to determine which attributes should be replaced with null aliases.
+     */
+    public static Tuple<List<Alias>, List<NamedExpression>> aliasedNulls(
+        List<Attribute> outputAttributes,
+        Predicate<Attribute> shouldBeReplaced
+    ) {
+        Map<DataType, Alias> nullLiterals = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
+        List<NamedExpression> newProjections = new ArrayList<>(outputAttributes.size());
+        for (Attribute attr : outputAttributes) {
+            NamedExpression projection;
+            if (shouldBeReplaced.test(attr)) {
+                DataType dt = attr.dataType();
+                Alias nullAlias = nullLiterals.get(dt);
+                // save the first field as null (per datatype)
+                if (nullAlias == null) {
+                    // Keep the same id so downstream query plans don't need updating
+                    // NOTE: THIS IS BRITTLE AND CAN LEAD TO BUGS.
+                    // In case some optimizer rule or so inserts a plan node that requires the field BEFORE the Eval that we're adding
+                    // on top of the EsRelation, this can trigger a field extraction in the physical optimizer phase, causing wrong
+                    // layouts due to a duplicate name id.
+                    // If someone reaches here AGAIN when debugging e.g. ClassCastExceptions NPEs from wrong layouts, we should probably
+                    // give up on this approach and instead insert EvalExecs in InsertFieldExtraction.
+                    Alias alias = new Alias(attr.source(), attr.name(), Literal.of(attr, null), attr.id());
+                    nullLiterals.put(dt, alias);
+                    projection = alias.toAttribute();
+                }
+                // otherwise point to it since this avoids creating field copies
+                else {
+                    projection = new Alias(attr.source(), attr.name(), nullAlias.toAttribute(), attr.id());
+                }
+            } else {
+                projection = attr;
+            }
+            newProjections.add(projection);
+        }
+
+        return new Tuple<>(new ArrayList<>(nullLiterals.values()), newProjections);
+    }
+
+    /**
+     * Collects references to foldables from the given logical plan, returning an {@link AttributeMap} that maps
+     * foldable aliases to their corresponding literal values.
+     *
+     * @param plan The logical plan to analyze.
+     * @param ctx The optimizer context providing fold context.
+     * @return An {@link AttributeMap} containing foldable references and their literal values.
+     */
+    public static AttributeMap<Expression> foldableReferences(LogicalPlan plan, LogicalOptimizerContext ctx) {
+        AttributeMap.Builder<Expression> collectRefsBuilder = AttributeMap.builder();
+
+        // collect aliases bottom-up
+        plan.forEachExpressionUp(Alias.class, a -> {
+            var c = a.child();
+            boolean shouldCollect = c.foldable();
+            // try to resolve the expression based on an existing foldables
+            if (shouldCollect == false) {
+                c = c.transformUp(ReferenceAttribute.class, r -> collectRefsBuilder.build().resolve(r, r));
+                shouldCollect = c.foldable();
+            }
+            if (shouldCollect) {
+                collectRefsBuilder.put(a.toAttribute(), Literal.of(ctx.foldCtx(), c));
+            }
+        });
+
+        return collectRefsBuilder.build();
+    }
+}

+ 4 - 21
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateEvalFoldables.java

@@ -7,12 +7,11 @@
 
 package org.elasticsearch.xpack.esql.optimizer.rules.logical;
 
-import org.elasticsearch.xpack.esql.core.expression.Alias;
 import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
-import org.elasticsearch.xpack.esql.core.expression.Literal;
 import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
 import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
+import org.elasticsearch.xpack.esql.optimizer.rules.RuleUtils;
 import org.elasticsearch.xpack.esql.plan.logical.Eval;
 import org.elasticsearch.xpack.esql.plan.logical.Filter;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
@@ -26,24 +25,8 @@ public final class PropagateEvalFoldables extends ParameterizedRule<LogicalPlan,
 
     @Override
     public LogicalPlan apply(LogicalPlan plan, LogicalOptimizerContext ctx) {
-        AttributeMap.Builder<Expression> collectRefsBuilder = AttributeMap.builder();
-
-        java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefsBuilder.build().resolve(r, r);
-
-        // collect aliases bottom-up
-        plan.forEachExpressionUp(Alias.class, a -> {
-            var c = a.child();
-            boolean shouldCollect = c.foldable();
-            // try to resolve the expression based on an existing foldables
-            if (shouldCollect == false) {
-                c = c.transformUp(ReferenceAttribute.class, replaceReference);
-                shouldCollect = c.foldable();
-            }
-            if (shouldCollect) {
-                collectRefsBuilder.put(a.toAttribute(), Literal.of(ctx.foldCtx(), c));
-            }
-        });
-        if (collectRefsBuilder.isEmpty()) {
+        AttributeMap<Expression> collectRefs = RuleUtils.foldableReferences(plan, ctx);
+        if (collectRefs.isEmpty()) {
             return plan;
         }
 
@@ -52,7 +35,7 @@ public final class PropagateEvalFoldables extends ParameterizedRule<LogicalPlan,
             // TODO: also allow aggregates once aggs on constants are supported.
             // C.f. https://github.com/elastic/elasticsearch/issues/100634
             if (p instanceof Filter || p instanceof Eval) {
-                p = p.transformExpressionsOnly(ReferenceAttribute.class, replaceReference);
+                p = p.transformExpressionsOnly(ReferenceAttribute.class, r -> collectRefs.resolve(r, r));
             }
             return p;
         });

+ 68 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PruneLeftJoinOnNullMatchingField.java

@@ -0,0 +1,68 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;
+
+import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
+import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.expression.Expressions;
+import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
+import org.elasticsearch.xpack.esql.optimizer.rules.RuleUtils;
+import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
+import org.elasticsearch.xpack.esql.plan.logical.Eval;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.Project;
+import org.elasticsearch.xpack.esql.plan.logical.join.Join;
+
+import static org.elasticsearch.xpack.esql.core.expression.Expressions.isGuaranteedNull;
+import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
+
+/**
+ * The rule checks if the join's performed on a field which is aliased to null (in type or value); if that's the case, it prunes the join,
+ * replacing it with an Eval - returning aliases to null for all the fields added in by the right side of the Join - plus a Project on top
+ * of it. The rule can apply on the coordinator already, but it's more likely to be effective on the data nodes, where null aliasing is
+ * inserted due to locally missing fields. This rule relies on that behavior -- see {@link ReplaceFieldWithConstantOrNull}.
+ */
+public class PruneLeftJoinOnNullMatchingField extends OptimizerRules.ParameterizedOptimizerRule<Join, LogicalOptimizerContext> {
+
+    public PruneLeftJoinOnNullMatchingField() {
+        super(OptimizerRules.TransformDirection.DOWN);
+    }
+
+    @Override
+    protected LogicalPlan rule(Join join, LogicalOptimizerContext ctx) {
+        LogicalPlan plan = join;
+        if (join.config().type() == LEFT) { // other types will have different replacement logic
+            AttributeMap<Expression> attributeMap = RuleUtils.foldableReferences(join, ctx);
+
+            for (var attr : AttributeSet.of(join.config().matchFields())) {
+                var resolved = attributeMap.resolve(attr);
+                if (resolved != null && isGuaranteedNull(resolved)) {
+                    plan = replaceJoin(join);
+                    break;
+                }
+            }
+        }
+        return plan;
+    }
+
+    private static LogicalPlan replaceJoin(Join join) {
+        var joinRightOutput = join.rightOutputFields();
+        // can be empty when the join key is null and the rest of the right side entries pruned (such as by an agg)
+        if (joinRightOutput.isEmpty()) {
+            return join.left();
+        }
+        var aliasedNulls = RuleUtils.aliasedNulls(joinRightOutput, a -> true);
+        var eval = new Eval(join.source(), join.left(), aliasedNulls.v1());
+        return new Project(
+            join.source(),
+            eval,
+            Join.computeOutput(join.left().output(), Expressions.asAttributes(aliasedNulls.v2()), join.config())
+        );
+    }
+}

+ 8 - 36
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java

@@ -7,17 +7,14 @@
 
 package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;
 
-import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.index.IndexMode;
-import org.elasticsearch.xpack.esql.core.expression.Alias;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
 import org.elasticsearch.xpack.esql.core.expression.Literal;
-import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
-import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
+import org.elasticsearch.xpack.esql.optimizer.rules.RuleUtils;
 import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
 import org.elasticsearch.xpack.esql.plan.logical.Eval;
 import org.elasticsearch.xpack.esql.plan.logical.Filter;
@@ -28,7 +25,6 @@ import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
 import org.elasticsearch.xpack.esql.plan.logical.TopN;
 import org.elasticsearch.xpack.esql.rule.ParameterizedRule;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -90,42 +86,18 @@ public class ReplaceFieldWithConstantOrNull extends ParameterizedRule<LogicalPla
             // \_Eval[field1 = null, field3 = null]
             // \_EsRelation[field1, field2, field3]
             List<Attribute> relationOutput = relation.output();
-            Map<DataType, Alias> nullLiterals = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
-            List<NamedExpression> newProjections = new ArrayList<>(relationOutput.size());
-            for (int i = 0, size = relationOutput.size(); i < size; i++) {
-                Attribute attr = relationOutput.get(i);
-                NamedExpression projection;
-                if (attr instanceof FieldAttribute f && shouldBeRetained.test(f) == false) {
-                    DataType dt = f.dataType();
-                    Alias nullAlias = nullLiterals.get(dt);
-                    // save the first field as null (per datatype)
-                    if (nullAlias == null) {
-                        // Keep the same id so downstream query plans don't need updating
-                        // NOTE: THIS IS BRITTLE AND CAN LEAD TO BUGS.
-                        // In case some optimizer rule or so inserts a plan node that requires the field BEFORE the Eval that we're adding
-                        // on top of the EsRelation, this can trigger a field extraction in the physical optimizer phase, causing wrong
-                        // layouts due to a duplicate name id.
-                        // If someone reaches here AGAIN when debugging e.g. ClassCastExceptions NPEs from wrong layouts, we should probably
-                        // give up on this approach and instead insert EvalExecs in InsertFieldExtraction.
-                        Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), f.id());
-                        nullLiterals.put(dt, alias);
-                        projection = alias.toAttribute();
-                    }
-                    // otherwise point to it since this avoids creating field copies
-                    else {
-                        projection = new Alias(f.source(), f.name(), nullAlias.toAttribute(), f.id());
-                    }
-                } else {
-                    projection = attr;
-                }
-                newProjections.add(projection);
-            }
+            var aliasedNulls = RuleUtils.aliasedNulls(
+                relationOutput,
+                attr -> attr instanceof FieldAttribute f && shouldBeRetained.test(f) == false
+            );
+            var nullLiterals = aliasedNulls.v1();
+            var newProjections = aliasedNulls.v2();
 
             if (nullLiterals.size() == 0) {
                 return plan;
             }
 
-            Eval eval = new Eval(plan.source(), relation, new ArrayList<>(nullLiterals.values()));
+            Eval eval = new Eval(plan.source(), relation, nullLiterals);
             // This projection is redundant if there's another projection downstream (and no commands depend on the order until we hit it).
             return new Project(plan.source(), eval, newProjections);
         }

+ 172 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java

@@ -68,8 +68,10 @@ import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
 import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
 import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
+import org.elasticsearch.xpack.esql.plan.physical.GrokExec;
 import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
 import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
+import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
 import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
@@ -109,6 +111,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.unboundLogicalOptimizerContext;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
+import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
 import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore;
 import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType;
 import static org.hamcrest.Matchers.contains;
@@ -201,7 +204,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
         IndexResolution getIndexResult = IndexResolution.valid(test);
 
         return new Analyzer(
-            new AnalyzerContext(config, new EsqlFunctionRegistry(), getIndexResult, enrichResolution),
+            new AnalyzerContext(config, new EsqlFunctionRegistry(), getIndexResult, defaultLookupResolution(), enrichResolution),
             new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L))
         );
     }
@@ -1325,6 +1328,174 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
         assertThat(Expressions.names(fields), contains("_meta_field", "gender", "hire_date", "job", "job.raw", "languages", "long_noidx"));
     }
 
+    /*
+     * LimitExec[1000[INTEGER]]
+     * \_AggregateExec[[language_code{r}#6],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#11, language_code{r}#6],FINAL,[language_code{r}#6, $
+     *      $c$count{r}#25, $$c$seen{r}#26],12]
+     *   \_ExchangeExec[[language_code{r}#6, $$c$count{r}#25, $$c$seen{r}#26],true]
+     *     \_AggregateExec[[languages{r}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#11, languages{r}#15 AS language_code#6],INITIAL,[langua
+     *          ges{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12]
+     *       \_FieldExtractExec[emp_no{f}#12]<[],[]>
+     *         \_EvalExec[[null[INTEGER] AS languages#15]]
+     *           \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#29], limit[], sort[] estimatedRowSize[12]
+     */
+    public void testMissingFieldsPurgesTheJoinLocally() {
+        var stats = EsqlTestUtils.statsForMissingField("languages");
+
+        var plan = plannerOptimizer.plan("""
+            from test
+            | keep emp_no, languages
+            | rename languages AS language_code
+            | lookup join languages_lookup ON language_code
+            | stats c = count(emp_no) by language_code
+            """, stats);
+
+        var limit = as(plan, LimitExec.class);
+        var agg = as(limit.child(), AggregateExec.class);
+        assertThat(Expressions.names(agg.output()), contains("c", "language_code"));
+
+        var exchange = as(agg.child(), ExchangeExec.class);
+        agg = as(exchange.child(), AggregateExec.class);
+        var extract = as(agg.child(), FieldExtractExec.class);
+        var eval = as(extract.child(), EvalExec.class);
+        var source = as(eval.child(), EsQueryExec.class);
+    }
+
+    /*
+     * LimitExec[1000[INTEGER]]
+     * \_AggregateExec[[language_code{r}#7],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#7],FINAL,[language_code{r}#7, $
+     *      $c$count{r}#32, $$c$seen{r}#33],12]
+     *   \_ExchangeExec[[language_code{r}#7, $$c$count{r}#32, $$c$seen{r}#33],true]
+     *     \_AggregateExec[[language_code{r}#7],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#7],INITIAL,[language_code{r}#7,
+     *          $$c$count{r}#34, $$c$seen{r}#35],12]
+     *       \_GrokExec[first_name{f}#19,Parser[pattern=%{WORD:foo}, grok=org.elasticsearch.grok.Grok@75389ac1],[foo{r}#12]]
+     *         \_MvExpandExec[emp_no{f}#18,emp_no{r}#31]
+     *           \_ProjectExec[[emp_no{f}#18, languages{r}#21 AS language_code#7, first_name{f}#19]]
+     *             \_FieldExtractExec[emp_no{f}#18, first_name{f}#19]<[],[]>
+     *               \_EvalExec[[null[INTEGER] AS languages#21]]
+     *                 \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#36], limit[], sort[] estimatedRowSize[112]
+     */
+    public void testMissingFieldsPurgesTheJoinLocallyThroughCommands() {
+        var stats = EsqlTestUtils.statsForMissingField("languages");
+
+        var plan = plannerOptimizer.plan("""
+            from test
+            | keep emp_no, languages, first_name
+            | rename languages AS language_code
+            | mv_expand emp_no
+            | grok first_name "%{WORD:foo}"
+            | lookup join languages_lookup ON language_code
+            | stats c = count(emp_no) by language_code
+            """, stats);
+
+        var limit = as(plan, LimitExec.class);
+        var agg = as(limit.child(), AggregateExec.class);
+        assertThat(Expressions.names(agg.output()), contains("c", "language_code"));
+
+        var exchange = as(agg.child(), ExchangeExec.class);
+        agg = as(exchange.child(), AggregateExec.class);
+        var grok = as(agg.child(), GrokExec.class);
+        var mvexpand = as(grok.child(), MvExpandExec.class);
+        var project = as(mvexpand.child(), ProjectExec.class);
+        var extract = as(project.child(), FieldExtractExec.class);
+        var eval = as(extract.child(), EvalExec.class);
+        var source = as(eval.child(), EsQueryExec.class);
+    }
+
+    /*
+     * LimitExec[1000[INTEGER]]
+     * \_AggregateExec[[language_code{r}#12],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#12],FINAL,[language_code{r}#12
+     * , $$c$count{r}#32, $$c$seen{r}#33],12]
+     *   \_ExchangeExec[[language_code{r}#12, $$c$count{r}#32, $$c$seen{r}#33],true]
+     *     \_AggregateExec[[language_code{r}#12],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#12],INITIAL,[language_code{r}#
+     *          12, $$c$count{r}#34, $$c$seen{r}#35],12]
+     *       \_LookupJoinExec[[language_code{r}#12],[language_code{f}#29],[]]
+     *         |_GrokExec[first_name{f}#19,Parser[pattern=%{NUMBER:language_code:int}, grok=org.elasticsearch.grok.Grok@764e5109],[languag
+     *              e_code{r}#12]]
+     *         | \_MvExpandExec[emp_no{f}#18,emp_no{r}#31]
+     *         |   \_ProjectExec[[emp_no{f}#18, languages{r}#21 AS language_code#7, first_name{f}#19]]
+     *         |     \_FieldExtractExec[emp_no{f}#18, first_name{f}#19]<[],[]>
+     *         |       \_EvalExec[[null[INTEGER] AS languages#21]]
+     *         |         \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#36], limit[], sort[] estimatedRowSize[66]
+     *         \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#37], limit[], sort[] estimatedRowSize[4]
+     */
+    public void testMissingFieldsNotPurgingTheJoinLocally() {
+        var stats = EsqlTestUtils.statsForMissingField("languages");
+
+        var plan = plannerOptimizer.plan("""
+            from test
+            | keep emp_no, languages, first_name
+            | rename languages AS language_code
+            | mv_expand emp_no
+            | grok first_name "%{NUMBER:language_code:int}" // this reassigns language_code
+            | lookup join languages_lookup ON language_code
+            | stats c = count(emp_no) by language_code
+            """, stats);
+
+        var limit = as(plan, LimitExec.class);
+        var agg = as(limit.child(), AggregateExec.class);
+        assertThat(Expressions.names(agg.output()), contains("c", "language_code"));
+
+        var exchange = as(agg.child(), ExchangeExec.class);
+        agg = as(exchange.child(), AggregateExec.class);
+        var join = as(agg.child(), LookupJoinExec.class);
+        var grok = as(join.left(), GrokExec.class);
+        var mvexpand = as(grok.child(), MvExpandExec.class);
+        var project = as(mvexpand.child(), ProjectExec.class);
+        var extract = as(project.child(), FieldExtractExec.class);
+        var eval = as(extract.child(), EvalExec.class);
+        var source = as(eval.child(), EsQueryExec.class);
+        var right = as(join.right(), EsQueryExec.class);
+    }
+
+    /*
+     * LimitExec[1000[INTEGER]]
+     * \_LookupJoinExec[[language_code{r}#6],[language_code{f}#23],[language_name{f}#24]]
+     *   |_LimitExec[1000[INTEGER]]
+     *   | \_AggregateExec[[languages{f}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{f}#15 AS language_code#6],FINAL,[language
+     *          s{f}#15, $$c$count{r}#25, $$c$seen{r}#26],62]
+     *   |   \_ExchangeExec[[languages{f}#15, $$c$count{r}#25, $$c$seen{r}#26],true]
+     *   |     \_AggregateExec[[languages{r}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{r}#15 AS language_code#6],INITIAL,
+     *              [languages{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12]
+     *   |       \_FieldExtractExec[emp_no{f}#12]<[],[]>
+     *   |         \_EvalExec[[null[INTEGER] AS languages#15]]
+     *   |           \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#29], limit[], sort[] estimatedRowSize[12]
+     *   \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#30], limit[], sort[] estimatedRowSize[4]
+     */
+    public void testMissingFieldsDoesNotPurgeTheJoinOnCoordinator() {
+        var stats = EsqlTestUtils.statsForMissingField("languages");
+
+        // same as the query above, but with the last two lines swapped, so that the join is no longer pushed to the data nodes
+        var plan = plannerOptimizer.plan("""
+            from test
+            | keep emp_no, languages
+            | rename languages AS language_code
+            | stats c = count(emp_no) by language_code
+            | lookup join languages_lookup ON language_code
+            """, stats);
+
+        var limit = as(plan, LimitExec.class);
+        var join = as(limit.child(), LookupJoinExec.class);
+        limit = as(join.left(), LimitExec.class);
+        var agg = as(limit.child(), AggregateExec.class);
+        var exchange = as(agg.child(), ExchangeExec.class);
+        agg = as(exchange.child(), AggregateExec.class);
+        var extract = as(agg.child(), FieldExtractExec.class);
+        var eval = as(extract.child(), EvalExec.class);
+        assertThat(eval.fields().size(), is(1));
+        var alias = as(eval.fields().get(0), Alias.class);
+        assertThat(alias.name(), is("languages"));
+        var literal = as(alias.child(), Literal.class);
+        assertNull(literal.value());
+        var source = as(eval.child(), EsQueryExec.class);
+        assertThat(source.indexPattern(), is("test"));
+        assertThat(source.indexMode(), is(IndexMode.STANDARD));
+
+        source = as(join.right(), EsQueryExec.class);
+        assertThat(source.indexPattern(), is("languages_lookup"));
+        assertThat(source.indexMode(), is(IndexMode.LOOKUP));
+    }
+
     /*
      Checks that match filters are pushed down to Lucene when using no casting, for example:
      WHERE first_name:"Anna")

+ 44 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

@@ -2721,6 +2721,50 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         var localRelation = as(limitBefore.child(), LocalRelation.class);
     }
 
+    /*
+     * EsqlProject[[emp_no{f}#9, first_name{f}#10, languages{f}#12, language_code{r}#3, language_name{r}#22]]
+     * \_Eval[[null[INTEGER] AS language_code#3, null[KEYWORD] AS language_name#22]]
+     *   \_Limit[1000[INTEGER],false]
+     *     \_EsRelation[test][_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..]
+     */
+    public void testPruneJoinOnNullMatchingField() {
+        var plan = optimizedPlan("""
+            from test
+            | eval language_code = null::integer
+            | keep emp_no, first_name, languages, language_code
+            | lookup join languages_lookup on language_code
+            """);
+
+        var project = as(plan, Project.class);
+        assertThat(Expressions.names(project.output()), contains("emp_no", "first_name", "languages", "language_code", "language_name"));
+        var eval = as(project.child(), Eval.class);
+        var limit = asLimit(eval.child(), 1000, false);
+        var source = as(limit.child(), EsRelation.class);
+    }
+
+    /*
+     * EsqlProject[[emp_no{f}#15, first_name{f}#16, my_null{r}#3 AS language_code#9, language_name{r}#27]]
+     * \_Eval[[null[INTEGER] AS my_null#3, null[KEYWORD] AS language_name#27]]
+     *   \_Limit[1000[INTEGER],false]
+     *     \_EsRelation[test][_meta_field{f}#21, emp_no{f}#15, first_name{f}#16, ..]
+     */
+    public void testPruneJoinOnNullAssignedMatchingField() {
+        var plan = optimizedPlan("""
+            from test
+            | eval my_null = null::integer
+            | rename languages as language_code
+            | eval language_code = my_null
+            | lookup join languages_lookup on language_code
+            | keep emp_no, first_name, language_code, language_name
+            """);
+
+        var project = as(plan, EsqlProject.class);
+        assertThat(Expressions.names(project.output()), contains("emp_no", "first_name", "language_code", "language_name"));
+        var eval = as(project.child(), Eval.class);
+        var limit = asLimit(eval.child(), 1000, false);
+        var source = as(limit.child(), EsRelation.class);
+    }
+
     private static List<String> orderNames(TopN topN) {
         return topN.order().stream().map(o -> as(o.child(), NamedExpression.class).name()).toList();
     }

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

@@ -7567,7 +7567,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         // The TopN needs an estimated row size for the planner to work
         var plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(EstimatesRowSize.estimateRowSize(0, plan), config);
         plan = useDataNodePlan ? plans.v2() : plans.v1();
-        plan = PlannerUtils.localPlan(List.of(), config, FoldContext.small(), plan);
+        plan = PlannerUtils.localPlan(config, FoldContext.small(), plan, TEST_SEARCH_STATS);
         ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(null, 10, () -> 10);
         LocalExecutionPlanner planner = new LocalExecutionPlanner(
             "test",