Browse Source

Add Dependency Checker for LogicalLocalPlanOptimizer (#130409)

Add verification for LocalLogical plan
The verification is skipped if there is remote enrich, similar to how it is skipped for LocalPhysical plan optimization.
The skip only happens for LocalLogical and LocalPhysical plan optimizers.
Julian Kiryakov 3 months ago
parent
commit
2f7527722a

+ 5 - 0
docs/changelog/130409.yaml

@@ -0,0 +1,5 @@
+pr: 130409
+summary: Add Dependency Checker for `LogicalLocalPlanOptimizer`
+area: ES|QL
+type: enhancement
+issues: []

+ 11 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.xpack.esql.optimizer;
 
+import org.elasticsearch.xpack.esql.VerificationException;
+import org.elasticsearch.xpack.esql.common.Failures;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEmptyRelation;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStatsFilteredAggWithEval;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStringCasingWithInsensitiveRegexMatch;
@@ -35,6 +37,8 @@ import static org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer.operat
  */
 public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {
 
+    private final LogicalVerifier verifier = LogicalVerifier.INSTANCE;
+
     private static final List<Batch<LogicalPlan>> RULES = arrayAsArrayList(
         new Batch<>(
             "Local rewrite",
@@ -81,6 +85,12 @@ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<Logical
     }
 
     public LogicalPlan localOptimize(LogicalPlan plan) {
-        return execute(plan);
+        LogicalPlan optimized = execute(plan);
+        Failures failures = verifier.verify(optimized, true);
+        if (failures.hasFailures()) {
+            throw new VerificationException(failures);
+        }
+        return optimized;
     }
+
 }

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

@@ -46,7 +46,7 @@ public class LocalPhysicalPlanOptimizer extends ParameterizedRuleExecutor<Physic
     }
 
     PhysicalPlan verify(PhysicalPlan plan) {
-        Failures failures = verifier.verify(plan);
+        Failures failures = verifier.verify(plan, true);
         if (failures.hasFailures()) {
             throw new VerificationException(failures);
         }

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

@@ -112,7 +112,7 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan,
     public LogicalPlan optimize(LogicalPlan verified) {
         var optimized = execute(verified);
 
-        Failures failures = verifier.verify(optimized);
+        Failures failures = verifier.verify(optimized, false);
         if (failures.hasFailures()) {
             throw new VerificationException(failures);
         }

+ 10 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.esql.optimizer;
 import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
 import org.elasticsearch.xpack.esql.common.Failures;
 import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
+import org.elasticsearch.xpack.esql.plan.logical.Enrich;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 
 public final class LogicalVerifier {
@@ -19,10 +20,18 @@ public final class LogicalVerifier {
     private LogicalVerifier() {}
 
     /** Verifies the optimized logical plan. */
-    public Failures verify(LogicalPlan plan) {
+    public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
         Failures failures = new Failures();
         Failures dependencyFailures = new Failures();
 
+        if (skipRemoteEnrichVerification) {
+            // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
+            var enriches = plan.collectFirstChildren(Enrich.class::isInstance);
+            if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
+                return failures;
+            }
+        }
+
         plan.forEachUp(p -> {
             PlanConsistencyChecker.checkPlan(p, dependencyFailures);
 

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

@@ -38,7 +38,7 @@ public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor<PhysicalPla
     }
 
     PhysicalPlan verify(PhysicalPlan plan) {
-        Failures failures = verifier.verify(plan);
+        Failures failures = verifier.verify(plan, false);
         if (failures.hasFailures()) {
             throw new VerificationException(failures);
         }

+ 7 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java

@@ -27,14 +27,16 @@ public final class PhysicalVerifier {
     private PhysicalVerifier() {}
 
     /** Verifies the physical plan. */
-    public Failures verify(PhysicalPlan plan) {
+    public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification) {
         Failures failures = new Failures();
         Failures depFailures = new Failures();
 
-        // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
-        var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
-        if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
-            return failures;
+        if (skipRemoteEnrichVerification) {
+            // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
+            var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
+            if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
+                return failures;
+            }
         }
 
         plan.forEachDown(p -> {

+ 32 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java

@@ -23,13 +23,16 @@ import org.elasticsearch.xpack.esql.core.expression.Expressions;
 import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
 import org.elasticsearch.xpack.esql.core.expression.FoldContext;
 import org.elasticsearch.xpack.esql.core.expression.Literal;
+import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
 import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.EsField;
 import org.elasticsearch.xpack.esql.core.type.InvalidMappedField;
+import org.elasticsearch.xpack.esql.expression.Order;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
 import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Case;
 import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
 import org.elasticsearch.xpack.esql.expression.function.scalar.string.StartsWith;
@@ -49,6 +52,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Filter;
 import org.elasticsearch.xpack.esql.plan.logical.Limit;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
+import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
 import org.elasticsearch.xpack.esql.plan.logical.Project;
 import org.elasticsearch.xpack.esql.plan.logical.Row;
 import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
@@ -64,6 +68,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
+import static java.util.Arrays.asList;
 import static java.util.Collections.emptyMap;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.L;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.ONE;
@@ -84,6 +89,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.unboundLogicalOptimizer
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
 import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
@@ -774,6 +780,32 @@ public class LocalLogicalPlanOptimizerTests extends ESTestCase {
         as(eval.child(), EsRelation.class);
     }
 
+    public void testPlanSanityCheck() throws Exception {
+        var plan = localPlan("""
+            from test
+            | stats a = min(salary) by emp_no
+            """);
+
+        var limit = as(plan, Limit.class);
+        var aggregate = as(limit.child(), Aggregate.class);
+        var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
+        var salary = as(min.field(), NamedExpression.class);
+        assertThat(salary.name(), is("salary"));
+        // emulate a rule that adds an invalid field
+        var invalidPlan = new OrderBy(
+            limit.source(),
+            limit,
+            asList(new Order(limit.source(), salary, Order.OrderDirection.ASC, Order.NullsPosition.FIRST))
+        );
+
+        var localContext = new LocalLogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small(), TEST_SEARCH_STATS);
+        LocalLogicalPlanOptimizer localLogicalPlanOptimizer = new LocalLogicalPlanOptimizer(localContext);
+
+        IllegalStateException e = expectThrows(IllegalStateException.class, () -> localLogicalPlanOptimizer.localOptimize(invalidPlan));
+        assertThat(e.getMessage(), containsString("Plan [OrderBy[[Order[salary"));
+        assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references [salary"));
+    }
+
     private IsNotNull isNotNull(Expression field) {
         return new IsNotNull(EMPTY, field);
     }

+ 38 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java

@@ -43,6 +43,7 @@ import org.elasticsearch.xpack.esql.core.expression.Alias;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.Expressions;
 import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
+import org.elasticsearch.xpack.esql.core.expression.FoldContext;
 import org.elasticsearch.xpack.esql.core.expression.Literal;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
@@ -52,9 +53,11 @@ import org.elasticsearch.xpack.esql.core.type.EsField;
 import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField;
 import org.elasticsearch.xpack.esql.core.util.Holder;
 import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
+import org.elasticsearch.xpack.esql.expression.Order;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
 import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction;
 import org.elasticsearch.xpack.esql.expression.function.fulltext.Kql;
 import org.elasticsearch.xpack.esql.expression.function.fulltext.Match;
@@ -131,8 +134,12 @@ import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLoo
 import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.indexWithDateDateNanosUnionType;
 import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore;
 import static org.elasticsearch.xpack.esql.core.type.DataType.DATE_NANOS;
+import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
+import static org.elasticsearch.xpack.esql.core.util.TestUtils.getFieldAttribute;
+import static org.elasticsearch.xpack.esql.plan.physical.AbstractPhysicalPlanSerializationTests.randomEstimatedRowSize;
 import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
@@ -2056,6 +2063,37 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
         assertThat(expected.toString(), is(esQuery.query().toString()));
     }
 
+    public void testVerifierOnMissingReferences() throws Exception {
+
+        PhysicalPlan plan = plannerOptimizer.plan("""
+            from test
+            | stats a = min(salary) by emp_no
+            """);
+
+        var limit = as(plan, LimitExec.class);
+        var aggregate = as(limit.child(), AggregateExec.class);
+        var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
+        var salary = as(min.field(), NamedExpression.class);
+        assertThat(salary.name(), is("salary"));
+        // emulate a rule that adds a missing attribute
+        FieldAttribute missingAttr = getFieldAttribute("missing attr");
+        List<Order> orders = List.of(new Order(plan.source(), missingAttr, Order.OrderDirection.ASC, Order.NullsPosition.FIRST));
+        TopNExec topNExec = new TopNExec(plan.source(), plan, orders, new Literal(Source.EMPTY, limit, INTEGER), randomEstimatedRowSize());
+
+        // We want to verify that the localOptimize detects the missing attribute.
+        // However, it also throws an error in one of the rules before we get to the verifier.
+        // So we use an implementation of LocalPhysicalPlanOptimizer that does not have any rules.
+        LocalPhysicalOptimizerContext context = new LocalPhysicalOptimizerContext(config, FoldContext.small(), SearchStats.EMPTY);
+        LocalPhysicalPlanOptimizer optimizerWithNoopExecute = new LocalPhysicalPlanOptimizer(context) {
+            @Override
+            protected List<Batch<PhysicalPlan>> batches() {
+                return List.of();
+            }
+        };
+        Exception e = expectThrows(IllegalStateException.class, () -> optimizerWithNoopExecute.localOptimize(topNExec));
+        assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references [missing attr"));
+    }
+
     private boolean isMultiTypeEsField(Expression e) {
         return e instanceof FieldAttribute fa && fa.field() instanceof MultiTypeEsField;
     }

+ 3 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

@@ -5554,7 +5554,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
             List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
             LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);
 
-            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan);
+            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
             assertFalse(inconsistencies.hasFailures());
 
             Project project = as(optimizedPlan, Project.class);
@@ -5605,7 +5605,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
             List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
             LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);
 
-            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan);
+            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
             assertFalse(inconsistencies.hasFailures());
 
             Project project = as(optimizedPlan, Project.class);
@@ -5661,7 +5661,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
 
             // This ensures that our generating plan doesn't use invalid references, resp. that any rename from the Project has
             // been propagated into the generating plan.
-            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan);
+            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
             assertFalse(inconsistencies.hasFailures());
 
             Project project = as(optimizedPlan, Project.class);