Browse Source

Add checks that optimizers do not modify the layout (#130855)

Add verification that the optimizers do not modify the number of attributes and the attribute datatype.
We add special handling for Lookup Join, by checking EsQueryExec esQueryExec && esQueryExec.indexMode() == LOOKUP and another special handling for ProjectAwayColumns.ALL_FIELDS_PROJECTED

Closes #125576
Julian Kiryakov 3 months ago
parent
commit
40585a2199
15 changed files with 566 additions and 61 deletions
  1. 6 0
      docs/changelog/130855.yaml
  2. 15 0
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Attribute.java
  3. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java
  4. 5 4
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java
  5. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java
  6. 11 16
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java
  7. 5 4
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java
  8. 10 15
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java
  9. 82 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java
  10. 2 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java
  11. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/rule/RuleExecutor.java
  12. 106 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java
  13. 106 4
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java
  14. 102 6
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
  15. 113 7
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

+ 6 - 0
docs/changelog/130855.yaml

@@ -0,0 +1,6 @@
+pr: 130855
+summary: Add checks that optimizers do not modify the layout
+area: ES|QL
+type: enhancement
+issues:
+ - 125576

+ 15 - 0
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Attribute.java

@@ -134,4 +134,19 @@ public abstract class Attribute extends NamedExpression {
     }
 
     protected abstract String label();
+
+    /**
+     * Compares the size and datatypes of two lists of attributes for equality.
+     */
+    public static boolean dataTypeEquals(List<Attribute> left, List<Attribute> right) {
+        if (left.size() != right.size()) {
+            return false;
+        }
+        for (int i = 0; i < left.size(); i++) {
+            if (left.get(i).dataType() != right.get(i).dataType()) {
+                return false;
+            }
+        }
+        return true;
+    }
 }

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java

@@ -88,7 +88,7 @@ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<Logical
 
     public LogicalPlan localOptimize(LogicalPlan plan) {
         LogicalPlan optimized = execute(plan);
-        Failures failures = verifier.verify(optimized, true);
+        Failures failures = verifier.verify(optimized, true, plan.output());
         if (failures.hasFailures()) {
             throw new VerificationException(failures);
         }

+ 5 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.optimizer;
 
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.common.Failures;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
 import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
 import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ParallelizeTimeSeriesSource;
@@ -42,15 +43,15 @@ public class LocalPhysicalPlanOptimizer extends ParameterizedRuleExecutor<Physic
     }
 
     public PhysicalPlan localOptimize(PhysicalPlan plan) {
-        return verify(execute(plan));
+        return verify(execute(plan), plan.output());
     }
 
-    PhysicalPlan verify(PhysicalPlan plan) {
-        Failures failures = verifier.verify(plan, true);
+    PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
+        Failures failures = verifier.verify(optimizedPlan, true, expectedOutputAttributes);
         if (failures.hasFailures()) {
             throw new VerificationException(failures);
         }
-        return plan;
+        return optimizedPlan;
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

@@ -113,7 +113,7 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan,
     public LogicalPlan optimize(LogicalPlan verified) {
         var optimized = execute(verified);
 
-        Failures failures = verifier.verify(optimized, false);
+        Failures failures = verifier.verify(optimized, false, verified.output());
         if (failures.hasFailures()) {
             throw new VerificationException(failures);
         }

+ 11 - 16
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java

@@ -13,27 +13,28 @@ import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 
-public final class LogicalVerifier {
+public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier<LogicalPlan> {
 
     public static final LogicalVerifier INSTANCE = new LogicalVerifier();
 
     private LogicalVerifier() {}
 
-    /** Verifies the optimized logical plan. */
-    public Failures verify(LogicalPlan plan, boolean skipRemoteEnrichVerification) {
-        Failures failures = new Failures();
-        Failures dependencyFailures = new Failures();
-
+    @Override
+    boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
         if (skipRemoteEnrichVerification) {
             // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
-            var enriches = plan.collectFirstChildren(Enrich.class::isInstance);
+            var enriches = optimizedPlan.collectFirstChildren(Enrich.class::isInstance);
             if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
-                return failures;
+                return true;
             }
         }
+        return false;
+    }
 
-        plan.forEachUp(p -> {
-            PlanConsistencyChecker.checkPlan(p, dependencyFailures);
+    @Override
+    void checkPlanConsistency(LogicalPlan optimizedPlan, Failures failures, Failures depFailures) {
+        optimizedPlan.forEachUp(p -> {
+            PlanConsistencyChecker.checkPlan(p, depFailures);
 
             if (failures.hasFailures() == false) {
                 if (p instanceof PostOptimizationVerificationAware pova) {
@@ -46,11 +47,5 @@ public final class LogicalVerifier {
                 });
             }
         });
-
-        if (dependencyFailures.hasFailures()) {
-            throw new IllegalStateException(dependencyFailures.toString());
-        }
-
-        return failures;
     }
 }

+ 5 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.optimizer;
 
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.common.Failures;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
 import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@@ -34,15 +35,15 @@ public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor<PhysicalPla
     }
 
     public PhysicalPlan optimize(PhysicalPlan plan) {
-        return verify(execute(plan));
+        return verify(execute(plan), plan.output());
     }
 
-    PhysicalPlan verify(PhysicalPlan plan) {
-        Failures failures = verifier.verify(plan, false);
+    PhysicalPlan verify(PhysicalPlan optimizedPlan, List<Attribute> expectedOutputAttributes) {
+        Failures failures = verifier.verify(optimizedPlan, false, expectedOutputAttributes);
         if (failures.hasFailures()) {
             throw new VerificationException(failures);
         }
-        return plan;
+        return optimizedPlan;
     }
 
     @Override

+ 10 - 15
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java

@@ -20,26 +20,27 @@ import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import static org.elasticsearch.xpack.esql.common.Failure.fail;
 
 /** Physical plan verifier. */
-public final class PhysicalVerifier {
+public final class PhysicalVerifier extends PostOptimizationPhasePlanVerifier<PhysicalPlan> {
 
     public static final PhysicalVerifier INSTANCE = new PhysicalVerifier();
 
     private PhysicalVerifier() {}
 
-    /** Verifies the physical plan. */
-    public Failures verify(PhysicalPlan plan, boolean skipRemoteEnrichVerification) {
-        Failures failures = new Failures();
-        Failures depFailures = new Failures();
-
+    @Override
+    boolean skipVerification(PhysicalPlan optimizedPlan, boolean skipRemoteEnrichVerification) {
         if (skipRemoteEnrichVerification) {
             // AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
-            var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
+            var enriches = optimizedPlan.collectFirstChildren(EnrichExec.class::isInstance);
             if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
-                return failures;
+                return true;
             }
         }
+        return false;
+    }
 
-        plan.forEachDown(p -> {
+    @Override
+    void checkPlanConsistency(PhysicalPlan optimizedPlan, Failures failures, Failures depFailures) {
+        optimizedPlan.forEachDown(p -> {
             if (p instanceof FieldExtractExec fieldExtractExec) {
                 Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
                 if (sourceAttribute == null) {
@@ -66,11 +67,5 @@ public final class PhysicalVerifier {
                 });
             }
         });
-
-        if (depFailures.hasFailures()) {
-            throw new IllegalStateException(depFailures.toString());
-        }
-
-        return failures;
     }
 }

+ 82 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PostOptimizationPhasePlanVerifier.java

@@ -0,0 +1,82 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.optimizer;
+
+import org.elasticsearch.xpack.esql.common.Failures;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
+import org.elasticsearch.xpack.esql.plan.QueryPlan;
+import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
+
+import java.util.List;
+
+import static org.elasticsearch.index.IndexMode.LOOKUP;
+import static org.elasticsearch.xpack.esql.common.Failure.fail;
+import static org.elasticsearch.xpack.esql.core.expression.Attribute.dataTypeEquals;
+
+/**
+ * Verifies the plan after optimization.
+ * This is invoked immediately after a Plan Optimizer completes its work.
+ * Currently, it is called after LogicalPlanOptimizer, PhysicalPlanOptimizer,
+ * LocalLogicalPlanOptimizer, and LocalPhysicalPlanOptimizer.
+ * Note: Logical and Physical optimizers may override methods in this class to perform different checks.
+ */
+public abstract class PostOptimizationPhasePlanVerifier<P extends QueryPlan<P>> {
+
+    /** Verifies the optimized plan */
+    public Failures verify(P optimizedPlan, boolean skipRemoteEnrichVerification, List<Attribute> expectedOutputAttributes) {
+        Failures failures = new Failures();
+        Failures depFailures = new Failures();
+        if (skipVerification(optimizedPlan, skipRemoteEnrichVerification)) {
+            return failures;
+        }
+
+        checkPlanConsistency(optimizedPlan, failures, depFailures);
+
+        verifyOutputNotChanged(optimizedPlan, expectedOutputAttributes, failures);
+
+        if (depFailures.hasFailures()) {
+            throw new IllegalStateException(depFailures.toString());
+        }
+
+        return failures;
+    }
+
+    abstract boolean skipVerification(P optimizedPlan, boolean skipRemoteEnrichVerification);
+
+    abstract void checkPlanConsistency(P optimizedPlan, Failures failures, Failures depFailures);
+
+    private static void verifyOutputNotChanged(QueryPlan<?> optimizedPlan, List<Attribute> expectedOutputAttributes, Failures failures) {
+        if (dataTypeEquals(expectedOutputAttributes, optimizedPlan.output()) == false) {
+            // If the output level is empty we add a column called ProjectAwayColumns.ALL_FIELDS_PROJECTED
+            // We will ignore such cases for output verification
+            // TODO: this special casing is required due to https://github.com/elastic/elasticsearch/issues/121741, remove when fixed.
+            boolean hasProjectAwayColumns = optimizedPlan.output()
+                .stream()
+                .anyMatch(x -> x.name().equals(ProjectAwayColumns.ALL_FIELDS_PROJECTED));
+            // LookupJoinExec represents the lookup index with EsSourceExec and this is turned into EsQueryExec by
+            // ReplaceSourceAttributes. Because InsertFieldExtractions doesn't apply to lookup indices, the
+            // right hand side will only have the EsQueryExec providing the _doc attribute and nothing else.
+            // We perform an optimizer run on every fragment. LookupJoinExec also contains such a fragment,
+            // and currently it only contains an EsQueryExec after optimization.
+            boolean hasLookupJoinExec = optimizedPlan instanceof EsQueryExec esQueryExec && esQueryExec.indexMode() == LOOKUP;
+            boolean ignoreError = hasProjectAwayColumns || hasLookupJoinExec;
+            if (ignoreError == false) {
+                failures.add(
+                    fail(
+                        optimizedPlan,
+                        "Output has changed from [{}] to [{}]. ",
+                        expectedOutputAttributes.toString(),
+                        optimizedPlan.output().toString()
+                    )
+                );
+            }
+        }
+    }
+
+}

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java

@@ -36,6 +36,7 @@ import static java.util.Collections.singletonList;
  * extraction.
  */
 public class ProjectAwayColumns extends Rule<PhysicalPlan, PhysicalPlan> {
+    public static String ALL_FIELDS_PROJECTED = "<all-fields-projected>";
 
     @Override
     public PhysicalPlan apply(PhysicalPlan plan) {
@@ -94,7 +95,7 @@ public class ProjectAwayColumns extends Rule<PhysicalPlan, PhysicalPlan> {
                         // add a synthetic field (so it doesn't clash with the user defined one) to return a constant
                         // to avoid the block from being trimmed
                         if (output.isEmpty()) {
-                            var alias = new Alias(logicalFragment.source(), "<all-fields-projected>", Literal.NULL, null, true);
+                            var alias = new Alias(logicalFragment.source(), ALL_FIELDS_PROJECTED, Literal.NULL, null, true);
                             List<Alias> fields = singletonList(alias);
                             logicalFragment = new Eval(logicalFragment.source(), logicalFragment, fields);
                             output = Expressions.asAttributes(fields);

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/rule/RuleExecutor.java

@@ -175,7 +175,7 @@ public abstract class RuleExecutor<TreeType extends Node<TreeType>> {
                     if (tf.hasChanged()) {
                         hasChanged = true;
                         if (log.isTraceEnabled()) {
-                            log.trace("Rule {} applied\n{}", rule, NodeUtils.diffString(tf.before, tf.after));
+                            log.trace("Rule {} applied with change\n{}", rule, NodeUtils.diffString(tf.before, tf.after));
                         }
                     } else {
                         if (log.isTraceEnabled()) {

+ 106 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.esql.EsqlTestUtils;
+import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.analysis.Analyzer;
 import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
 import org.elasticsearch.xpack.esql.core.expression.Alias;
@@ -30,6 +31,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.EsField;
 import org.elasticsearch.xpack.esql.core.type.InvalidMappedField;
+import org.elasticsearch.xpack.esql.core.util.Holder;
 import org.elasticsearch.xpack.esql.expression.Order;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
@@ -43,6 +45,7 @@ import org.elasticsearch.xpack.esql.expression.predicate.nulls.IsNotNull;
 import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Add;
 import org.elasticsearch.xpack.esql.index.EsIndex;
 import org.elasticsearch.xpack.esql.index.IndexResolution;
+import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferIsNotNull;
 import org.elasticsearch.xpack.esql.parser.EsqlParser;
 import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
@@ -59,10 +62,12 @@ import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
 import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
 import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
+import org.elasticsearch.xpack.esql.rule.RuleExecutor;
 import org.elasticsearch.xpack.esql.stats.SearchStats;
 import org.hamcrest.Matchers;
 import org.junit.BeforeClass;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -88,6 +93,9 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.statsForMissingField;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.unboundLogicalOptimizerContext;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
 import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
+import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
+import static org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules.TransformDirection.DOWN;
+import static org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules.TransformDirection.UP;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -780,7 +788,7 @@ public class LocalLogicalPlanOptimizerTests extends ESTestCase {
         as(eval.child(), EsRelation.class);
     }
 
-    public void testPlanSanityCheck() throws Exception {
+    public void testVerifierOnMissingReferences() throws Exception {
         var plan = localPlan("""
             from test
             | stats a = min(salary) by emp_no
@@ -806,6 +814,103 @@ public class LocalLogicalPlanOptimizerTests extends ESTestCase {
         assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references [salary"));
     }
 
+    private LocalLogicalPlanOptimizer getCustomRulesLocalLogicalPlanOptimizer(List<RuleExecutor.Batch<LogicalPlan>> batches) {
+        LocalLogicalOptimizerContext context = new LocalLogicalOptimizerContext(
+            EsqlTestUtils.TEST_CFG,
+            FoldContext.small(),
+            TEST_SEARCH_STATS
+        );
+        LocalLogicalPlanOptimizer customOptimizer = new LocalLogicalPlanOptimizer(context) {
+            @Override
+            protected List<Batch<LogicalPlan>> batches() {
+                return batches;
+            }
+        };
+        return customOptimizer;
+    }
+
+    public void testVerifierOnAdditionalAttributeAdded() throws Exception {
+        var plan = localPlan("""
+            from test
+            | stats a = min(salary) by emp_no
+            """);
+
+        var limit = as(plan, Limit.class);
+        var aggregate = as(limit.child(), Aggregate.class);
+        var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
+        var salary = as(min.field(), NamedExpression.class);
+        assertThat(salary.name(), is("salary"));
+        Holder<Integer> appliedCount = new Holder<>(0);
+        // use a custom rule that adds another output attribute
+        var customRuleBatch = new RuleExecutor.Batch<>(
+            "CustomRuleBatch",
+            RuleExecutor.Limiter.ONCE,
+            new OptimizerRules.ParameterizedOptimizerRule<Aggregate, LocalLogicalOptimizerContext>(UP) {
+
+                @Override
+                protected LogicalPlan rule(Aggregate plan, LocalLogicalOptimizerContext context) {
+                    // This rule adds a missing attribute to the plan output
+                    // We only want to apply it once, so we use a static counter
+                    if (appliedCount.get() == 0) {
+                        appliedCount.set(appliedCount.get() + 1);
+                        Literal additionalLiteral = new Literal(Source.EMPTY, "additional literal", INTEGER);
+                        return new Eval(plan.source(), plan, List.of(new Alias(Source.EMPTY, "additionalAttribute", additionalLiteral)));
+                    }
+                    return plan;
+                }
+
+            }
+        );
+        LocalLogicalPlanOptimizer customRulesLocalLogicalPlanOptimizer = getCustomRulesLocalLogicalPlanOptimizer(List.of(customRuleBatch));
+        Exception e = expectThrows(VerificationException.class, () -> customRulesLocalLogicalPlanOptimizer.localOptimize(plan));
+        assertThat(e.getMessage(), containsString("Output has changed from"));
+        assertThat(e.getMessage(), containsString("additionalAttribute"));
+    }
+
+    public void testVerifierOnAttributeDatatypeChanged() {
+        var plan = localPlan("""
+            from test
+            | stats a = min(salary) by emp_no
+            """);
+
+        var limit = as(plan, Limit.class);
+        var aggregate = as(limit.child(), Aggregate.class);
+        var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
+        var salary = as(min.field(), NamedExpression.class);
+        assertThat(salary.name(), is("salary"));
+        Holder<Integer> appliedCount = new Holder<>(0);
+        // use a custom rule that changes the datatype of an output attribute
+        var customRuleBatch = new RuleExecutor.Batch<>(
+            "CustomRuleBatch",
+            RuleExecutor.Limiter.ONCE,
+            new OptimizerRules.ParameterizedOptimizerRule<LogicalPlan, LocalLogicalOptimizerContext>(DOWN) {
+                @Override
+                protected LogicalPlan rule(LogicalPlan plan, LocalLogicalOptimizerContext context) {
+                    // We only want to apply it once, so we use a static counter
+                    if (appliedCount.get() == 0) {
+                        appliedCount.set(appliedCount.get() + 1);
+                        Limit limit = as(plan, Limit.class);
+                        Limit newLimit = new Limit(plan.source(), limit.limit(), limit.child()) {
+                            @Override
+                            public List<Attribute> output() {
+                                List<Attribute> oldOutput = super.output();
+                                List<Attribute> newOutput = new ArrayList<>(oldOutput);
+                                newOutput.set(0, oldOutput.get(0).withDataType(DataType.DATETIME));
+                                return newOutput;
+                            }
+                        };
+                        return newLimit;
+                    }
+                    return plan;
+                }
+
+            }
+        );
+        LocalLogicalPlanOptimizer customRulesLocalLogicalPlanOptimizer = getCustomRulesLocalLogicalPlanOptimizer(List.of(customRuleBatch));
+        Exception e = expectThrows(VerificationException.class, () -> customRulesLocalLogicalPlanOptimizer.localOptimize(plan));
+        assertThat(e.getMessage(), containsString("Output has changed from"));
+    }
+
     private IsNotNull isNotNull(Expression field) {
         return new IsNotNull(EMPTY, field);
     }

+ 106 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java

@@ -34,12 +34,14 @@ import org.elasticsearch.test.VersionUtils;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.EsqlTestUtils.TestSearchStats;
+import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
 import org.elasticsearch.xpack.esql.analysis.Analyzer;
 import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
 import org.elasticsearch.xpack.esql.analysis.Verifier;
 import org.elasticsearch.xpack.esql.core.expression.Alias;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.Expressions;
 import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
@@ -100,6 +102,7 @@ import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
 import org.elasticsearch.xpack.esql.rule.Rule;
+import org.elasticsearch.xpack.esql.rule.RuleExecutor;
 import org.elasticsearch.xpack.esql.session.Configuration;
 import org.elasticsearch.xpack.esql.stats.SearchContextStats;
 import org.elasticsearch.xpack.esql.stats.SearchStats;
@@ -2389,20 +2392,119 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
         // We want to verify that the localOptimize detects the missing attribute.
         // However, it also throws an error in one of the rules before we get to the verifier.
         // So we use an implementation of LocalPhysicalPlanOptimizer that does not have any rules.
+        LocalPhysicalPlanOptimizer optimizerWithNoRules = getCustomRulesLocalPhysicalPlanOptimizer(List.of());
+        Exception e = expectThrows(IllegalStateException.class, () -> optimizerWithNoRules.localOptimize(topNExec));
+        assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references [missing attr"));
+    }
+
+    private LocalPhysicalPlanOptimizer getCustomRulesLocalPhysicalPlanOptimizer(List<RuleExecutor.Batch<PhysicalPlan>> batches) {
         LocalPhysicalOptimizerContext context = new LocalPhysicalOptimizerContext(
             new EsqlFlags(true),
             config,
             FoldContext.small(),
             SearchStats.EMPTY
         );
-        LocalPhysicalPlanOptimizer optimizerWithNoopExecute = new LocalPhysicalPlanOptimizer(context) {
+        LocalPhysicalPlanOptimizer localPhysicalPlanOptimizer = new LocalPhysicalPlanOptimizer(context) {
             @Override
             protected List<Batch<PhysicalPlan>> batches() {
-                return List.of();
+                return batches;
             }
         };
-        Exception e = expectThrows(IllegalStateException.class, () -> optimizerWithNoopExecute.localOptimize(topNExec));
-        assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references [missing attr"));
+        return localPhysicalPlanOptimizer;
+    }
+
+    public void testVerifierOnAdditionalAttributeAdded() throws Exception {
+
+        PhysicalPlan plan = plannerOptimizer.plan("""
+            from test
+            | stats a = min(salary) by emp_no
+            """);
+
+        var limit = as(plan, LimitExec.class);
+        var aggregate = as(limit.child(), AggregateExec.class);
+        var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
+        var salary = as(min.field(), NamedExpression.class);
+        assertThat(salary.name(), is("salary"));
+        Holder<Integer> appliedCount = new Holder<>(0);
+        // use a custom rule that adds another output attribute
+        var customRuleBatch = new RuleExecutor.Batch<>(
+            "CustomRuleBatch",
+            RuleExecutor.Limiter.ONCE,
+            new PhysicalOptimizerRules.ParameterizedOptimizerRule<PhysicalPlan, LocalPhysicalOptimizerContext>() {
+                @Override
+                public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext context) {
+                    // This rule adds a missing attribute to the plan output
+                    // We only want to apply it once, so we use a static counter
+                    if (appliedCount.get() == 0) {
+                        appliedCount.set(appliedCount.get() + 1);
+                        Literal additionalLiteral = new Literal(Source.EMPTY, "additional literal", INTEGER);
+                        return new EvalExec(
+                            plan.source(),
+                            plan,
+                            List.of(new Alias(Source.EMPTY, "additionalAttribute", additionalLiteral))
+                        );
+                    }
+                    return plan;
+                }
+            }
+        );
+        LocalPhysicalPlanOptimizer customRulesLocalPhysicalPlanOptimizer = getCustomRulesLocalPhysicalPlanOptimizer(
+            List.of(customRuleBatch)
+        );
+        Exception e = expectThrows(VerificationException.class, () -> customRulesLocalPhysicalPlanOptimizer.localOptimize(plan));
+        assertThat(e.getMessage(), containsString("Output has changed from"));
+        assertThat(e.getMessage(), containsString("additionalAttribute"));
+    }
+
+    public void testVerifierOnAttributeDatatypeChanged() throws Exception {
+
+        PhysicalPlan plan = plannerOptimizer.plan("""
+            from test
+            | stats a = min(salary) by emp_no
+            """);
+
+        var limit = as(plan, LimitExec.class);
+        var aggregate = as(limit.child(), AggregateExec.class);
+        var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
+        var salary = as(min.field(), NamedExpression.class);
+        assertThat(salary.name(), is("salary"));
+        Holder<Integer> appliedCount = new Holder<>(0);
+        // use a custom rule that changes the datatype of an output attribute
+        var customRuleBatch = new RuleExecutor.Batch<>(
+            "CustomRuleBatch",
+            RuleExecutor.Limiter.ONCE,
+            new PhysicalOptimizerRules.ParameterizedOptimizerRule<PhysicalPlan, LocalPhysicalOptimizerContext>() {
+                @Override
+                public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext context) {
+                    // We only want to apply it once, so we use a static counter
+                    if (appliedCount.get() == 0) {
+                        appliedCount.set(appliedCount.get() + 1);
+                        LimitExec limit = as(plan, LimitExec.class);
+                        LimitExec newLimit = new LimitExec(
+                            plan.source(),
+                            limit.child(),
+                            new Literal(Source.EMPTY, 1000, INTEGER),
+                            randomEstimatedRowSize()
+                        ) {
+                            @Override
+                            public List<Attribute> output() {
+                                List<Attribute> oldOutput = super.output();
+                                List<Attribute> newOutput = new ArrayList<>(oldOutput);
+                                newOutput.set(0, oldOutput.get(0).withDataType(DataType.DATETIME));
+                                return newOutput;
+                            }
+                        };
+                        return newLimit;
+                    }
+                    return plan;
+                }
+            }
+        );
+        LocalPhysicalPlanOptimizer customRulesLocalPhysicalPlanOptimizer = getCustomRulesLocalPhysicalPlanOptimizer(
+            List.of(customRuleBatch)
+        );
+        Exception e = expectThrows(VerificationException.class, () -> customRulesLocalPhysicalPlanOptimizer.localOptimize(plan));
+        assertThat(e.getMessage(), containsString("Output has changed from"));
     }
 
     private boolean isMultiTypeEsField(Expression e) {

+ 102 - 6
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

@@ -132,6 +132,7 @@ import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
 import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
 import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
+import org.elasticsearch.xpack.esql.rule.RuleExecutor;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -179,6 +180,8 @@ import static org.elasticsearch.xpack.esql.expression.predicate.operator.compari
 import static org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison.BinaryComparisonOperation.GTE;
 import static org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison.BinaryComparisonOperation.LT;
 import static org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison.BinaryComparisonOperation.LTE;
+import static org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules.TransformDirection.DOWN;
+import static org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules.TransformDirection.UP;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.contains;
@@ -2902,7 +2905,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
     public void testInsist_fieldDoesNotExist_createsUnmappedFieldInRelation() {
         assumeTrue("Requires UNMAPPED FIELDS", EsqlCapabilities.Cap.UNMAPPED_FIELDS.isEnabled());
 
-        LogicalPlan plan = optimizedPlan("FROM test | INSIST_🐔 foo");
+        LogicalPlan plan = optimizedPlan("FROM test | INSIST_\uD83D\uDC14 foo");
 
         var project = as(plan, Project.class);
         var limit = as(project.child(), Limit.class);
@@ -2913,7 +2916,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
     public void testInsist_multiIndexFieldPartiallyExistsAndIsKeyword_castsAreNotSupported() {
         assumeTrue("Requires UNMAPPED FIELDS", EsqlCapabilities.Cap.UNMAPPED_FIELDS.isEnabled());
 
-        var plan = planMultiIndex("FROM multi_index | INSIST_🐔 partial_type_keyword");
+        var plan = planMultiIndex("FROM multi_index | INSIST_\uD83D\uDC14 partial_type_keyword");
         var project = as(plan, Project.class);
         var limit = as(project.child(), Limit.class);
         var relation = as(limit.child(), EsRelation.class);
@@ -2924,7 +2927,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
     public void testInsist_multipleInsistClauses_insistsAreFolded() {
         assumeTrue("Requires UNMAPPED FIELDS", EsqlCapabilities.Cap.UNMAPPED_FIELDS.isEnabled());
 
-        var plan = planMultiIndex("FROM multi_index | INSIST_🐔 partial_type_keyword | INSIST_🐔 foo");
+        var plan = planMultiIndex("FROM multi_index | INSIST_\uD83D\uDC14 partial_type_keyword | INSIST_\uD83D\uDC14 foo");
         var project = as(plan, Project.class);
         var limit = as(project.child(), Limit.class);
         var relation = as(limit.child(), EsRelation.class);
@@ -5561,7 +5564,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
             List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
             LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);
 
-            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
+            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output());
             assertFalse(inconsistencies.hasFailures());
 
             Project project = as(optimizedPlan, Project.class);
@@ -5612,7 +5615,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
             List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
             LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);
 
-            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
+            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output());
             assertFalse(inconsistencies.hasFailures());
 
             Project project = as(optimizedPlan, Project.class);
@@ -5668,7 +5671,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
 
             // This ensures that our generating plan doesn't use invalid references, resp. that any rename from the Project has
             // been propagated into the generating plan.
-            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false);
+            Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan.output());
             assertFalse(inconsistencies.hasFailures());
 
             Project project = as(optimizedPlan, Project.class);
@@ -8026,4 +8029,97 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
         assertThat(secondKnnFilters.size(), equalTo(1));
         assertTrue(secondKnnFilters.contains(firstOr.right()));
     }
+
+    private LogicalPlanOptimizer getCustomRulesLogicalPlanOptimizer(List<RuleExecutor.Batch<LogicalPlan>> batches) {
+        LogicalOptimizerContext context = new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small());
+        LogicalPlanOptimizer customOptimizer = new LogicalPlanOptimizer(context) {
+            @Override
+            protected List<Batch<LogicalPlan>> batches() {
+                return batches;
+            }
+        };
+        return customOptimizer;
+    }
+
+    public void testVerifierOnAdditionalAttributeAdded() throws Exception {
+        var plan = optimizedPlan("""
+            from test
+            | stats a = min(salary) by emp_no
+            """);
+
+        var limit = as(plan, Limit.class);
+        var aggregate = as(limit.child(), Aggregate.class);
+        var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
+        var salary = as(min.field(), NamedExpression.class);
+        assertThat(salary.name(), is("salary"));
+        Holder<Integer> appliedCount = new Holder<>(0);
+        // use a custom rule that adds another output attribute
+        var customRuleBatch = new RuleExecutor.Batch<>(
+            "CustomRuleBatch",
+            RuleExecutor.Limiter.ONCE,
+            new OptimizerRules.ParameterizedOptimizerRule<Aggregate, LogicalOptimizerContext>(UP) {
+                @Override
+                protected LogicalPlan rule(Aggregate plan, LogicalOptimizerContext context) {
+                    // This rule adds a missing attribute to the plan output
+                    // We only want to apply it once, so we use a static counter
+                    if (appliedCount.get() == 0) {
+                        appliedCount.set(appliedCount.get() + 1);
+                        Literal additionalLiteral = new Literal(Source.EMPTY, "additional literal", INTEGER);
+                        return new Eval(plan.source(), plan, List.of(new Alias(Source.EMPTY, "additionalAttribute", additionalLiteral)));
+                    }
+                    return plan;
+                }
+
+            }
+        );
+        LogicalPlanOptimizer customRulesLogicalPlanOptimizer = getCustomRulesLogicalPlanOptimizer(List.of(customRuleBatch));
+        Exception e = expectThrows(VerificationException.class, () -> customRulesLogicalPlanOptimizer.optimize(plan));
+        assertThat(e.getMessage(), containsString("Output has changed from"));
+        assertThat(e.getMessage(), containsString("additionalAttribute"));
+    }
+
+    public void testVerifierOnAttributeDatatypeChanged() {
+        var plan = optimizedPlan("""
+            from test
+            | stats a = min(salary) by emp_no
+            """);
+
+        var limit = as(plan, Limit.class);
+        var aggregate = as(limit.child(), Aggregate.class);
+        var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
+        var salary = as(min.field(), NamedExpression.class);
+        assertThat(salary.name(), is("salary"));
+        Holder<Integer> appliedCount = new Holder<>(0);
+        // use a custom rule that changes the datatype of an output attribute
+        var customRuleBatch = new RuleExecutor.Batch<>(
+            "CustomRuleBatch",
+            RuleExecutor.Limiter.ONCE,
+            new OptimizerRules.ParameterizedOptimizerRule<LogicalPlan, LogicalOptimizerContext>(DOWN) {
+                @Override
+                protected LogicalPlan rule(LogicalPlan plan, LogicalOptimizerContext context) {
+                    // We only want to apply it once, so we use a static counter
+                    if (appliedCount.get() == 0) {
+                        appliedCount.set(appliedCount.get() + 1);
+                        Limit limit = as(plan, Limit.class);
+                        Limit newLimit = new Limit(plan.source(), limit.limit(), limit.child()) {
+                            @Override
+                            public List<Attribute> output() {
+                                List<Attribute> oldOutput = super.output();
+                                List<Attribute> newOutput = new ArrayList<>(oldOutput);
+                                newOutput.set(0, oldOutput.get(0).withDataType(DataType.DATETIME));
+                                return newOutput;
+                            }
+                        };
+                        return newLimit;
+                    }
+                    return plan;
+                }
+
+            }
+        );
+        LogicalPlanOptimizer customRulesLogicalPlanOptimizer = getCustomRulesLogicalPlanOptimizer(List.of(customRuleBatch));
+        Exception e = expectThrows(VerificationException.class, () -> customRulesLogicalPlanOptimizer.optimize(plan));
+        assertThat(e.getMessage(), containsString("Output has changed from"));
+    }
+
 }

+ 113 - 7
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

@@ -64,11 +64,13 @@ import org.elasticsearch.xpack.esql.core.tree.Node;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.EsField;
+import org.elasticsearch.xpack.esql.core.util.Holder;
 import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
 import org.elasticsearch.xpack.esql.expression.Order;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.SpatialAggregateFunction;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.SpatialCentroid;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.SpatialExtent;
@@ -142,6 +144,7 @@ import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.elasticsearch.xpack.esql.querydsl.query.EqualsSyntheticSourceDelegate;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
 import org.elasticsearch.xpack.esql.querydsl.query.SpatialRelatesQuery;
+import org.elasticsearch.xpack.esql.rule.RuleExecutor;
 import org.elasticsearch.xpack.esql.session.Configuration;
 import org.elasticsearch.xpack.esql.stats.SearchStats;
 import org.junit.Before;
@@ -187,9 +190,11 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT;
 import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_SHAPE;
 import static org.elasticsearch.xpack.esql.core.type.DataType.GEO_POINT;
 import static org.elasticsearch.xpack.esql.core.type.DataType.GEO_SHAPE;
+import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
 import static org.elasticsearch.xpack.esql.core.util.TestUtils.stripThrough;
 import static org.elasticsearch.xpack.esql.parser.ExpressionBuilder.MAX_EXPRESSION_DEPTH;
 import static org.elasticsearch.xpack.esql.parser.LogicalPlanBuilder.MAX_QUERY_DEPTH;
+import static org.elasticsearch.xpack.esql.plan.physical.AbstractPhysicalPlanSerializationTests.randomEstimatedRowSize;
 import static org.elasticsearch.xpack.esql.planner.mapper.MapperUtils.hasScoreAttribute;
 import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.contains;
@@ -2873,7 +2878,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             )
         );
 
-        var e = expectThrows(VerificationException.class, () -> physicalPlanOptimizer.verify(badPlan));
+        var e = expectThrows(VerificationException.class, () -> physicalPlanOptimizer.verify(badPlan, verifiedPlan.output()));
         assertThat(
             e.getMessage(),
             containsString(
@@ -2888,7 +2893,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             | stats s = sum(salary) by emp_no
             | where emp_no > 10
             """);
-
+        final var planBeforeModification = plan;
         plan = plan.transformUp(
             AggregateExec.class,
             a -> new AggregateExec(
@@ -2902,7 +2907,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             )
         );
         final var finalPlan = plan;
-        var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(finalPlan));
+        var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(finalPlan, planBeforeModification.output()));
         assertThat(e.getMessage(), containsString(" > 10[INTEGER]]] optimized incorrectly due to missing references [emp_no{f}#"));
     }
 
@@ -2920,7 +2925,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
 
         var planWithInvalidJoinLeftSide = plan.transformUp(LookupJoinExec.class, join -> join.replaceChildren(join.right(), join.right()));
 
-        var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(planWithInvalidJoinLeftSide));
+        var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(planWithInvalidJoinLeftSide, plan.output()));
         assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references from left hand side [languages"));
 
         var planWithInvalidJoinRightSide = plan.transformUp(
@@ -2937,7 +2942,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             )
         );
 
-        e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(planWithInvalidJoinRightSide));
+        e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(planWithInvalidJoinRightSide, plan.output()));
         assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references from right hand side [language_code"));
     }
 
@@ -2947,7 +2952,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             | stats s = sum(salary) by emp_no
             | where emp_no > 10
             """);
-
+        final var planBeforeModification = plan;
         plan = plan.transformUp(AggregateExec.class, a -> {
             List<Attribute> intermediates = new ArrayList<>(a.intermediateAttributes());
             intermediates.add(intermediates.get(0));
@@ -2962,7 +2967,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             );
         });
         final var finalPlan = plan;
-        var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(finalPlan));
+        var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(finalPlan, planBeforeModification.output()));
         assertThat(
             e.getMessage(),
             containsString("Plan [LimitExec[1000[INTEGER],null]] optimized incorrectly due to duplicate output attribute emp_no{f}#")
@@ -8308,6 +8313,107 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         return sv.next();
     }
 
+    private PhysicalPlanOptimizer getCustomRulesPhysicalPlanOptimizer(List<RuleExecutor.Batch<PhysicalPlan>> batches) {
+        PhysicalOptimizerContext context = new PhysicalOptimizerContext(config);
+        PhysicalPlanOptimizer PhysicalPlanOptimizer = new PhysicalPlanOptimizer(context) {
+            @Override
+            protected List<Batch<PhysicalPlan>> batches() {
+                return batches;
+            }
+        };
+        return PhysicalPlanOptimizer;
+    }
+
+    public void testVerifierOnAdditionalAttributeAdded() throws Exception {
+
+        PhysicalPlan plan = physicalPlan("""
+            from test
+            | stats a = min(salary) by emp_no
+            """);
+
+        var limit = as(plan, LimitExec.class);
+        var aggregate = as(limit.child(), AggregateExec.class);
+        var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
+        var salary = as(min.field(), NamedExpression.class);
+        assertThat(salary.name(), is("salary"));
+        Holder<Integer> appliedCount = new Holder<>(0);
+        // use a custom rule that adds another output attribute
+        var customRuleBatch = new RuleExecutor.Batch<>(
+            "CustomRuleBatch",
+            RuleExecutor.Limiter.ONCE,
+            new PhysicalOptimizerRules.ParameterizedOptimizerRule<PhysicalPlan, PhysicalOptimizerContext>() {
+                @Override
+                public PhysicalPlan rule(PhysicalPlan plan, PhysicalOptimizerContext context) {
+                    // This rule adds a missing attribute to the plan output
+                    // We only want to apply it once, so we use a static counter
+                    if (appliedCount.get() == 0) {
+                        appliedCount.set(appliedCount.get() + 1);
+                        Literal additionalLiteral = new Literal(Source.EMPTY, "additional literal", INTEGER);
+                        return new EvalExec(
+                            plan.source(),
+                            plan,
+                            List.of(new Alias(Source.EMPTY, "additionalAttribute", additionalLiteral))
+                        );
+                    }
+                    return plan;
+                }
+            }
+        );
+        PhysicalPlanOptimizer customRulesPhysicalPlanOptimizer = getCustomRulesPhysicalPlanOptimizer(List.of(customRuleBatch));
+        Exception e = expectThrows(VerificationException.class, () -> customRulesPhysicalPlanOptimizer.optimize(plan));
+        assertThat(e.getMessage(), containsString("Output has changed from"));
+        assertThat(e.getMessage(), containsString("additionalAttribute"));
+    }
+
+    public void testVerifierOnAttributeDatatypeChanged() throws Exception {
+
+        PhysicalPlan plan = physicalPlan("""
+            from test
+            | stats a = min(salary) by emp_no
+            """);
+
+        var limit = as(plan, LimitExec.class);
+        var aggregate = as(limit.child(), AggregateExec.class);
+        var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
+        var salary = as(min.field(), NamedExpression.class);
+        assertThat(salary.name(), is("salary"));
+        Holder<Integer> appliedCount = new Holder<>(0);
+        // use a custom rule that changes the datatype of an output attribute
+        var customRuleBatch = new RuleExecutor.Batch<>(
+            "CustomRuleBatch",
+            RuleExecutor.Limiter.ONCE,
+            new PhysicalOptimizerRules.ParameterizedOptimizerRule<PhysicalPlan, PhysicalOptimizerContext>() {
+                @Override
+                public PhysicalPlan rule(PhysicalPlan plan, PhysicalOptimizerContext context) {
+                    // We only want to apply it once, so we use a static counter
+                    if (appliedCount.get() == 0) {
+                        appliedCount.set(appliedCount.get() + 1);
+                        LimitExec limit = as(plan, LimitExec.class);
+                        LimitExec newLimit = new LimitExec(
+                            plan.source(),
+                            limit.child(),
+                            new Literal(Source.EMPTY, 1000, INTEGER),
+                            randomEstimatedRowSize()
+                        ) {
+                            @Override
+                            public List<Attribute> output() {
+                                List<Attribute> oldOutput = super.output();
+                                List<Attribute> newOutput = new ArrayList<>(oldOutput);
+                                newOutput.set(0, oldOutput.get(0).withDataType(DataType.DATETIME));
+                                return newOutput;
+                            }
+                        };
+                        return newLimit;
+                    }
+                    return plan;
+                }
+            }
+        );
+        PhysicalPlanOptimizer customRulesPhysicalPlanOptimizer = getCustomRulesPhysicalPlanOptimizer(List.of(customRuleBatch));
+        Exception e = expectThrows(VerificationException.class, () -> customRulesPhysicalPlanOptimizer.optimize(plan));
+        assertThat(e.getMessage(), containsString("Output has changed from"));
+    }
+
     @Override
     protected List<String> filteredWarnings() {
         return withDefaultLimitWarning(super.filteredWarnings());