Переглянути джерело

ES|QL: Remove redundant sorts from execution plan (#121156) (#122231)

Luigi Dell'Aquila 8 місяців тому
батько
коміт
353f4c5330
30 змінених файлів з 679 додано та 359 видалено
  1. 5 0
      docs/changelog/121156.yaml
  2. 47 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec
  3. 14 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec
  4. 8 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
  5. 3 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java
  6. 3 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java
  7. 0 54
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/AddDefaultTopN.java
  8. 0 47
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneOrderByBeforeStats.java
  9. 82 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneRedundantOrderBy.java
  10. 0 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java
  11. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java
  12. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java
  13. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java
  14. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Eval.java
  15. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Filter.java
  16. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java
  17. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Keep.java
  18. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Lookup.java
  19. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/MvExpand.java
  20. 12 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/OrderBy.java
  21. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java
  22. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RegexExtract.java
  23. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Rename.java
  24. 93 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/SortAgnostic.java
  25. 2 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java
  26. 0 88
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/OrderExec.java
  27. 0 6
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java
  28. 1 7
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
  29. 397 89
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
  30. 0 46
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/OrderExecSerializationTests.java

+ 5 - 0
docs/changelog/121156.yaml

@@ -0,0 +1,5 @@
+pr: 121156
+summary: Remove redundant sorts from execution plan
+area: ES|QL
+type: bug
+issues: []

+ 47 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

@@ -1346,3 +1346,50 @@ language_code:integer | language_name:keyword       | country:text
 1                     | English                     | United States of America
 1                     | English                     | null
 ;
+
+
+sortBeforeAndAfterJoin
+required_capability: join_lookup_v12
+required_capability: remove_redundant_sort
+
+FROM employees
+| sort first_name
+| EVAL language_code = languages
+| LOOKUP JOIN languages_lookup ON language_code
+| WHERE emp_no >= 10091 AND emp_no < 10094
+| SORT emp_no
+| KEEP emp_no, language_code, language_name
+;
+
+emp_no:integer | language_code:integer | language_name:keyword
+10091          | 3                     | Spanish
+10092          | 1                     | English
+10093          | 3                     | Spanish
+;
+
+
+
+sortBeforeAndAfterMultipleJoinAndMvExpand
+required_capability: join_lookup_v12
+required_capability: remove_redundant_sort
+
+FROM employees
+| sort first_name
+| EVAL language_code = languages
+| LOOKUP JOIN languages_lookup ON language_code
+| WHERE emp_no >= 10091 AND emp_no < 10094
+| SORT language_name
+| MV_EXPAND first_name
+| SORT first_name
+| MV_EXPAND last_name
+| SORT last_name
+| LOOKUP JOIN languages_lookup ON language_code
+| SORT emp_no
+| KEEP emp_no, language_code, language_name
+;
+
+emp_no:integer | language_code:integer | language_name:keyword
+10091          | 3                     | Spanish
+10092          | 1                     | English
+10093          | 3                     | Spanish
+;

+ 14 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec

@@ -404,3 +404,17 @@ from employees | where emp_no == 10003  | mv_expand first_name | keep first_name
 first_name:keyword
 Parto
 ;
+
+
+sortBeforeAndAfterMvExpand
+from employees 
+| sort first_name 
+| mv_expand job_positions 
+| sort emp_no, job_positions 
+| keep emp_no, job_positions 
+| limit 2;
+
+emp_no:integer | job_positions:keyword   
+10001          | Accountant
+10001          | Senior Python Developer          
+;

+ 8 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -676,7 +676,14 @@ public class EsqlCapabilities {
         /**
          * Support for partial subset of metrics in aggregate_metric_double type
          */
-        AGGREGATE_METRIC_DOUBLE_PARTIAL_SUBMETRICS(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG);
+        AGGREGATE_METRIC_DOUBLE_PARTIAL_SUBMETRICS(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
+
+        /**
+         * Fix for https://github.com/elastic/elasticsearch/issues/120817
+         * and https://github.com/elastic/elasticsearch/issues/120803
+         * Support for queries that have multiple SORTs that cannot become TopN
+         */
+        REMOVE_REDUNDANT_SORT;
 
         private final boolean enabled;
 

+ 3 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

@@ -10,7 +10,6 @@ package org.elasticsearch.xpack.esql.optimizer;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.common.Failures;
 import org.elasticsearch.xpack.esql.core.type.DataType;
-import org.elasticsearch.xpack.esql.optimizer.rules.logical.AddDefaultTopN;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.BooleanFunctionEqualsElimination;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.BooleanSimplification;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineBinaryComparisons;
@@ -32,7 +31,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyPlans;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneFilters;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneLiteralsInOrderBy;
-import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneOrderByBeforeStats;
+import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantOrderBy;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantSortClauses;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineFilters;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineLimits;
@@ -116,10 +115,9 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan,
 
     protected static List<Batch<LogicalPlan>> rules() {
         var skip = new Batch<>("Skip Compute", new SkipQueryOnLimitZero());
-        var defaultTopN = new Batch<>("Add default TopN", new AddDefaultTopN());
         var label = new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized());
 
-        return asList(substitutions(), operators(), skip, cleanup(), defaultTopN, label);
+        return asList(substitutions(), operators(), skip, cleanup(), label);
     }
 
     protected static Batch<LogicalPlan> substitutions() {
@@ -189,7 +187,7 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan,
             new PushDownRegexExtract(),
             new PushDownEnrich(),
             new PushDownAndCombineOrderBy(),
-            new PruneOrderByBeforeStats(),
+            new PruneRedundantOrderBy(),
             new PruneRedundantSortClauses()
         );
     }

+ 3 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java

@@ -27,6 +27,9 @@ public final class LogicalVerifier {
             PlanConsistencyChecker.checkPlan(p, dependencyFailures);
 
             if (failures.hasFailures() == false) {
+                if (p instanceof PostOptimizationVerificationAware pova) {
+                    pova.postOptimizationVerification(failures);
+                }
                 p.forEachExpression(ex -> {
                     if (ex instanceof PostOptimizationVerificationAware va) {
                         va.postOptimizationVerification(failures);

+ 0 - 54
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/AddDefaultTopN.java

@@ -1,54 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.esql.optimizer.rules.logical;
-
-import org.elasticsearch.xpack.esql.core.expression.Literal;
-import org.elasticsearch.xpack.esql.core.type.DataType;
-import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
-import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
-import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
-import org.elasticsearch.xpack.esql.plan.logical.TopN;
-import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
-
-/**
- * This adds an explicit TopN node to a plan that only has an OrderBy right before Lucene.
- * To date, the only known use case that "needs" this is a query of the form
- * from test
- * | sort emp_no
- * | mv_expand first_name
- * | rename first_name AS x
- * | where x LIKE "*a*"
- * | limit 15
- * <p>
- * or
- * <p>
- * from test
- * | sort emp_no
- * | mv_expand first_name
- * | sort first_name
- * | limit 15
- * <p>
- * {@link PushDownAndCombineLimits} will copy the "limit 15" after "sort emp_no" if there is no filter on the expanded values
- * OR if there is no sort between "limit" and "mv_expand".
- * But, since this type of query has such a filter, the "sort emp_no" will have no limit when it reaches the current rule.
- */
-public final class AddDefaultTopN extends OptimizerRules.ParameterizedOptimizerRule<LogicalPlan, LogicalOptimizerContext> {
-    public AddDefaultTopN() {
-        super(OptimizerRules.TransformDirection.DOWN);
-    }
-
-    @Override
-    protected LogicalPlan rule(LogicalPlan plan, LogicalOptimizerContext context) {
-        if (plan instanceof UnaryPlan unary && unary.child() instanceof OrderBy order && order.child() instanceof EsRelation relation) {
-            var limit = new Literal(plan.source(), context.configuration().resultTruncationMaxSize(), DataType.INTEGER);
-            return unary.replaceChild(new TopN(plan.source(), relation, order.order(), limit));
-        }
-        return plan;
-    }
-}

+ 0 - 47
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneOrderByBeforeStats.java

@@ -1,47 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.esql.optimizer.rules.logical;
-
-import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
-import org.elasticsearch.xpack.esql.plan.logical.Enrich;
-import org.elasticsearch.xpack.esql.plan.logical.Eval;
-import org.elasticsearch.xpack.esql.plan.logical.Filter;
-import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
-import org.elasticsearch.xpack.esql.plan.logical.Project;
-import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
-import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
-
-public final class PruneOrderByBeforeStats extends OptimizerRules.OptimizerRule<Aggregate> {
-
-    @Override
-    protected LogicalPlan rule(Aggregate agg) {
-        OrderBy order = findPullableOrderBy(agg.child());
-
-        LogicalPlan p = agg;
-        if (order != null) {
-            p = agg.transformDown(OrderBy.class, o -> o == order ? order.child() : o);
-        }
-        return p;
-    }
-
-    private static OrderBy findPullableOrderBy(LogicalPlan plan) {
-        OrderBy pullable = null;
-        if (plan instanceof OrderBy o) {
-            pullable = o;
-        } else if (plan instanceof Eval
-            || plan instanceof Filter
-            || plan instanceof Project
-            || plan instanceof RegexExtract
-            || plan instanceof Enrich) {
-                pullable = findPullableOrderBy(((UnaryPlan) plan).child());
-            }
-        return pullable;
-    }
-
-}

+ 82 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneRedundantOrderBy.java

@@ -0,0 +1,82 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.optimizer.rules.logical;
+
+import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
+import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic;
+import org.elasticsearch.xpack.esql.plan.logical.TopN;
+import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.Set;
+
+/**
+ * SORT cannot be executed without a LIMIT, as ES|QL doesn't support unbounded sort (yet).
+ * <p>
+ * The planner tries to push down LIMIT and transform all the unbounded sorts into a TopN.
+ * In some cases it's not possible though, eg.
+ * <p>
+ * from test | sort x | lookup join lookup on x | sort y
+ * <p>
+ * from test | sort x | mv_expand x | sort y
+ * <p>
+ * "sort y" will become a TopN due to the addition of the default Limit, but "sort x" will remain unbounded,
+ * so the query could not be executed.
+ * <p>
+ * In most cases though, following commands can make the previous SORTs redundant,
+ * because it will re-sort previously sorted results (eg. if there is another SORT)
+ * or because the order will be scrambled by another command (eg. a STATS)
+ * <p>
+ * This rule finds and prunes redundant SORTs, attempting to make the plan executable.
+ */
+public class PruneRedundantOrderBy extends OptimizerRules.OptimizerRule<LogicalPlan> {
+
+    @Override
+    protected LogicalPlan rule(LogicalPlan plan) {
+        if (plan instanceof OrderBy || plan instanceof TopN || plan instanceof Aggregate) {
+            Set<OrderBy> redundant = findRedundantSort(((UnaryPlan) plan).child());
+            if (redundant.isEmpty()) {
+                return plan;
+            }
+            return plan.transformDown(p -> redundant.contains(p) ? ((UnaryPlan) p).child() : p);
+        } else {
+            return plan;
+        }
+    }
+
+    /**
+     * breadth-first recursion to find redundant SORTs in the children tree.
+     * Returns an identity set (we need to compare and prune the exact instances)
+     */
+    private Set<OrderBy> findRedundantSort(LogicalPlan plan) {
+        Set<OrderBy> result = Collections.newSetFromMap(new IdentityHashMap<>());
+
+        Deque<LogicalPlan> toCheck = new ArrayDeque<>();
+        toCheck.push(plan);
+
+        while (true) {
+            if (toCheck.isEmpty()) {
+                return result;
+            }
+            LogicalPlan p = toCheck.pop();
+            if (p instanceof OrderBy ob) {
+                result.add(ob);
+                toCheck.push(ob.child());
+            } else if (p instanceof SortAgnostic) {
+                for (LogicalPlan child : p.children()) {
+                    toCheck.push(child);
+                }
+            }
+        }
+    }
+}

+ 0 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java

@@ -43,7 +43,6 @@ import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
 import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
 import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
 import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
-import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
 import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
 import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
 import org.elasticsearch.xpack.esql.plan.physical.SubqueryExec;
@@ -103,7 +102,6 @@ public class PlanWritables {
             LimitExec.ENTRY,
             LocalSourceExec.ENTRY,
             MvExpandExec.ENTRY,
-            OrderExec.ENTRY,
             ProjectExec.ENTRY,
             ShowExec.ENTRY,
             SubqueryExec.ENTRY,

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java

@@ -40,7 +40,7 @@ import static org.elasticsearch.xpack.esql.common.Failure.fail;
 import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
 import static org.elasticsearch.xpack.esql.plan.logical.Filter.checkFilterConditionDataType;
 
-public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware {
+public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
         LogicalPlan.class,
         "Aggregate",

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Drop.java

@@ -17,7 +17,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
 import java.util.List;
 import java.util.Objects;
 
-public class Drop extends UnaryPlan implements TelemetryAware {
+public class Drop extends UnaryPlan implements TelemetryAware, SortAgnostic {
     private final List<NamedExpression> removals;
 
     public Drop(Source source, LogicalPlan child, List<NamedExpression> removals) {

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

@@ -49,7 +49,7 @@ import static org.elasticsearch.xpack.esql.common.Failure.fail;
 import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes;
 import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
 
-public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAnalysisPlanVerificationAware, TelemetryAware {
+public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAnalysisPlanVerificationAware, TelemetryAware, SortAgnostic {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
         LogicalPlan.class,
         "Enrich",

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Eval.java

@@ -38,7 +38,7 @@ import static org.elasticsearch.xpack.esql.common.Failure.fail;
 import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes;
 import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
 
-public class Eval extends UnaryPlan implements GeneratingPlan<Eval>, PostAnalysisVerificationAware, TelemetryAware {
+public class Eval extends UnaryPlan implements GeneratingPlan<Eval>, PostAnalysisVerificationAware, TelemetryAware, SortAgnostic {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Eval", Eval::new);
 
     private final List<Alias> fields;

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Filter.java

@@ -29,7 +29,7 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.NULL;
  * {@code SELECT x FROM y WHERE z ..} the "WHERE" clause is a Filter. A
  * {@code Filter} has a "condition" Expression that does the filtering.
  */
-public class Filter extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware {
+public class Filter extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Filter", Filter::new);
 
     private final Expression condition;

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java

@@ -37,7 +37,7 @@ import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutp
  *     underlying aggregate.
  * </p>
  */
-public class InlineStats extends UnaryPlan implements NamedWriteable, SurrogateLogicalPlan, TelemetryAware {
+public class InlineStats extends UnaryPlan implements NamedWriteable, SurrogateLogicalPlan, TelemetryAware, SortAgnostic {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
         LogicalPlan.class,
         "InlineStats",

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Keep.java

@@ -15,7 +15,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
 import java.util.List;
 import java.util.Objects;
 
-public class Keep extends Project implements TelemetryAware {
+public class Keep extends Project implements TelemetryAware, SortAgnostic {
 
     public Keep(Source source, LogicalPlan child, List<? extends NamedExpression> projections) {
         super(source, child, projections);

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Lookup.java

@@ -32,7 +32,7 @@ import java.util.Objects;
  * Looks up values from the associated {@code tables}.
  * The class is supposed to be substituted by a {@link Join}.
  */
-public class Lookup extends UnaryPlan implements SurrogateLogicalPlan, TelemetryAware {
+public class Lookup extends UnaryPlan implements SurrogateLogicalPlan, TelemetryAware, SortAgnostic {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Lookup", Lookup::new);
 
     private final Expression tableName;

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/MvExpand.java

@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public class MvExpand extends UnaryPlan implements TelemetryAware {
+public class MvExpand extends UnaryPlan implements TelemetryAware, SortAgnostic {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "MvExpand", MvExpand::new);
 
     private final NamedExpression target;

+ 12 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/OrderBy.java

@@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
+import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
 import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
 import org.elasticsearch.xpack.esql.common.Failures;
 import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
@@ -25,7 +26,12 @@ import java.util.Objects;
 
 import static org.elasticsearch.xpack.esql.common.Failure.fail;
 
-public class OrderBy extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware {
+public class OrderBy extends UnaryPlan
+    implements
+        PostAnalysisVerificationAware,
+        PostOptimizationVerificationAware,
+        TelemetryAware,
+        SortAgnostic {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "OrderBy", OrderBy::new);
 
     private final List<Order> order;
@@ -109,4 +115,9 @@ public class OrderBy extends UnaryPlan implements PostAnalysisVerificationAware,
             }
         });
     }
+
+    @Override
+    public void postOptimizationVerification(Failures failures) {
+        failures.add(fail(this, "Unbounded sort not supported yet [{}] please add a limit", this.sourceText()));
+    }
 }

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java

@@ -25,7 +25,7 @@ import java.util.Objects;
 /**
  * A {@code Project} is a {@code Plan} with one child. In {@code SELECT x FROM y}, the "SELECT" statement is a Project.
  */
-public class Project extends UnaryPlan {
+public class Project extends UnaryPlan implements SortAgnostic {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Project", Project::new);
 
     private final List<? extends NamedExpression> projections;

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RegexExtract.java

@@ -24,7 +24,7 @@ import java.util.Objects;
 import static org.elasticsearch.xpack.esql.common.Failure.fail;
 import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
 
-public abstract class RegexExtract extends UnaryPlan implements GeneratingPlan<RegexExtract>, PostAnalysisVerificationAware {
+public abstract class RegexExtract extends UnaryPlan implements GeneratingPlan<RegexExtract>, PostAnalysisVerificationAware, SortAgnostic {
     protected final Expression input;
     protected final List<Attribute> extractedFields;
 

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Rename.java

@@ -21,7 +21,7 @@ import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
 import java.util.List;
 import java.util.Objects;
 
-public class Rename extends UnaryPlan implements TelemetryAware {
+public class Rename extends UnaryPlan implements TelemetryAware, SortAgnostic {
 
     private final List<Alias> renamings;
 

+ 93 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/SortAgnostic.java

@@ -0,0 +1,93 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plan.logical;
+
+/**
+ * This interface is intended to check redundancy of a previous SORT.
+ *<p>
+ *
+ * An example is with commands that compute values record by record, regardless of the input order
+ * and that don't rely on the context (intended as previous/next records).
+ *
+ * <hr>
+ * <p>
+ *
+ * Example 1: if a MY_COMMAND that implements this interface is used between two sorts,
+ * then we can assume that
+ * <p>
+ * <code>
+ * | SORT x, y, z | MY_COMMAND | SORT a, b, c
+ * </code>
+ * <p>
+ * is equivalent to
+ * <p>
+ * <code>
+ * | MY_COMMAND | SORT a, b, c
+ * </code>
+ *
+ * <hr>
+ * <p>
+ *
+ * Example 2: commands that make previous order irrelevant, eg. because they collapse the results;
+ * STATS is one of them, eg.
+ *
+ * <p>
+ * <code>
+ * | SORT x, y, z | STATS count(*)
+ * </code>
+ * <p>
+ * is equivalent to
+ * <p>
+ * <code>
+ * | STATS count(*)
+ * </code>
+ * <p>
+ *
+ * and if MY_COMMAND implements this interface, then
+ *
+ * <p>
+ * <code>
+ * | SORT x, y, z | MY_COMMAND | STATS count(*)
+ * </code>
+ * <p>
+ * is equivalent to
+ * <p>
+ * <code>
+ * | MY_COMMAND | STATS count(*)
+ * </code>
+ *
+ * <hr>
+ * <p>
+ *
+ * In all the other cases, eg. if the command does not implement this interface
+ * then we assume that the previous SORT is still relevant and cannot be pruned.
+ *
+ * <hr>
+ * <p>
+ *
+ * Eg. LIMIT does <b>not</b> implement this interface, because
+ *
+ * <p>
+ * <code>
+ * | SORT x, y, z | LIMIT 10 | SORT a, b, c
+ * </code>
+ * <p>
+ * is <b>NOT</b> equivalent to
+ * <p>
+ * <code>
+ * | LIMIT 10 | SORT a, b, c
+ * </code>
+ *
+ * <hr>
+ * <p>
+ *
+ * For n-ary plans that implement this interface,
+ * we assume that the above applies to all the children
+ *
+ */
+public interface SortAgnostic {}

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java

@@ -21,6 +21,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -32,7 +33,7 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT;
 import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
 import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
 
-public class Join extends BinaryPlan implements PostAnalysisVerificationAware {
+public class Join extends BinaryPlan implements PostAnalysisVerificationAware, SortAgnostic {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Join", Join::new);
 
     private final JoinConfig config;

+ 0 - 88
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/OrderExec.java

@@ -1,88 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.esql.plan.physical;
-
-import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
-import org.elasticsearch.xpack.esql.core.tree.Source;
-import org.elasticsearch.xpack.esql.expression.Order;
-import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-
-public class OrderExec extends UnaryExec {
-    public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
-        PhysicalPlan.class,
-        "OrderExec",
-        OrderExec::new
-    );
-
-    private final List<Order> order;
-
-    public OrderExec(Source source, PhysicalPlan child, List<Order> order) {
-        super(source, child);
-        this.order = order;
-    }
-
-    private OrderExec(StreamInput in) throws IOException {
-        this(
-            Source.readFrom((PlanStreamInput) in),
-            in.readNamedWriteable(PhysicalPlan.class),
-            in.readCollectionAsList(org.elasticsearch.xpack.esql.expression.Order::new)
-        );
-    }
-
-    @Override
-    public void writeTo(StreamOutput out) throws IOException {
-        Source.EMPTY.writeTo(out);
-        out.writeNamedWriteable(child());
-        out.writeCollection(order());
-    }
-
-    @Override
-    public String getWriteableName() {
-        return ENTRY.name;
-    }
-
-    @Override
-    protected NodeInfo<OrderExec> info() {
-        return NodeInfo.create(this, OrderExec::new, child(), order);
-    }
-
-    @Override
-    public OrderExec replaceChild(PhysicalPlan newChild) {
-        return new OrderExec(source(), newChild, order);
-    }
-
-    public List<Order> order() {
-        return order;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(order, child());
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-
-        OrderExec other = (OrderExec) obj;
-
-        return Objects.equals(order, other.order) && Objects.equals(child(), other.child());
-    }
-}

+ 0 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

@@ -17,7 +17,6 @@ import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
 import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
 import org.elasticsearch.xpack.esql.plan.logical.Limit;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
 import org.elasticsearch.xpack.esql.plan.logical.TopN;
 import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
 import org.elasticsearch.xpack.esql.plan.logical.join.Join;
@@ -28,7 +27,6 @@ import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
 import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
 import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
 import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
-import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
 
@@ -81,10 +79,6 @@ public class LocalMapper {
             return new LimitExec(limit.source(), mappedChild, limit.limit());
         }
 
-        if (unary instanceof OrderBy o) {
-            return new OrderExec(o.source(), mappedChild, o.order());
-        }
-
         if (unary instanceof TopN topN) {
             return new TopNExec(topN.source(), mappedChild, topN.order(), topN.limit(), null);
         }

+ 1 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

@@ -32,7 +32,6 @@ import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
 import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
 import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
 import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
-import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
 import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
@@ -105,7 +104,7 @@ public class Mapper {
                     return enrichExec.child();
                 }
                 if (f instanceof UnaryExec unaryExec) {
-                    if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) {
+                    if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof TopNExec) {
                         return f;
                     } else {
                         return unaryExec.child();
@@ -161,11 +160,6 @@ public class Mapper {
             return new LimitExec(limit.source(), mappedChild, limit.limit());
         }
 
-        if (unary instanceof OrderBy o) {
-            mappedChild = addExchangeForFragment(o, mappedChild);
-            return new OrderExec(o.source(), mappedChild, o.order());
-        }
-
         if (unary instanceof TopN topN) {
             mappedChild = addExchangeForFragment(topN, mappedChild);
             return new TopNExec(topN.source(), mappedChild, topN.order(), topN.limit(), null);

+ 397 - 89
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

@@ -96,6 +96,7 @@ import org.elasticsearch.xpack.esql.index.EsIndex;
 import org.elasticsearch.xpack.esql.index.IndexResolution;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.LiteralsOnTheRight;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
+import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantOrderBy;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineLimits;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownEnrich;
 import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownEval;
@@ -1839,10 +1840,9 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
 
     /**
      * Expected
-     * TopN[[Order[first_name{f}#170,ASC,LAST]],1000[INTEGER]]
-     *  \_MvExpand[first_name{f}#170]
-     *    \_TopN[[Order[emp_no{f}#169,ASC,LAST]],1000[INTEGER]]
-     *      \_EsRelation[test][avg_worked_seconds{f}#167, birth_date{f}#168, emp_n..]
+     * TopN[[Order[first_name{r}#5575,ASC,LAST]],1000[INTEGER]]
+     * \_MvExpand[first_name{f}#5565,first_name{r}#5575,null]
+     *   \_EsRelation[test][_meta_field{f}#5570, emp_no{f}#5564, first_name{f}#..]
      */
     public void testDontCombineOrderByThroughMvExpand() {
         LogicalPlan plan = optimizedPlan("""
@@ -1854,9 +1854,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         var topN = as(plan, TopN.class);
         assertThat(orderNames(topN), contains("first_name"));
         var mvExpand = as(topN.child(), MvExpand.class);
-        topN = as(mvExpand.child(), TopN.class);
-        assertThat(orderNames(topN), contains("emp_no"));
-        as(topN.child(), EsRelation.class);
+        as(mvExpand.child(), EsRelation.class);
     }
 
     /**
@@ -2065,12 +2063,10 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
     }
 
     /**
-     * Expected
-     * EsqlProject[[emp_no{f}#350, first_name{f}#351, salary{f}#352]]
-     *  \_TopN[[Order[salary{f}#352,ASC,LAST], Order[first_name{f}#351,ASC,LAST]],5[INTEGER]]
-     *    \_MvExpand[first_name{f}#351]
-     *      \_TopN[[Order[emp_no{f}#350,ASC,LAST]],10000[INTEGER]]
-     *        \_EsRelation[employees][emp_no{f}#350, first_name{f}#351, salary{f}#352]
+     * EsqlProject[[emp_no{f}#10, first_name{r}#21, salary{f}#15]]
+     * \_TopN[[Order[salary{f}#15,ASC,LAST], Order[first_name{r}#21,ASC,LAST]],5[INTEGER]]
+     *   \_MvExpand[first_name{f}#11,first_name{r}#21,null]
+     *     \_EsRelation[test][_meta_field{f}#16, emp_no{f}#10, first_name{f}#11, ..]
      */
     public void testPushDownLimitThroughMultipleSort_AfterMvExpand() {
         LogicalPlan plan = optimizedPlan("""
@@ -2086,20 +2082,16 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         assertThat(topN.limit().fold(FoldContext.small()), equalTo(5));
         assertThat(orderNames(topN), contains("salary", "first_name"));
         var mvExp = as(topN.child(), MvExpand.class);
-        topN = as(mvExp.child(), TopN.class);
-        assertThat(topN.limit().fold(FoldContext.small()), equalTo(10000));
-        assertThat(orderNames(topN), contains("emp_no"));
-        as(topN.child(), EsRelation.class);
+        as(mvExp.child(), EsRelation.class);
     }
 
     /**
      * Expected
-     * EsqlProject[[emp_no{f}#361, first_name{f}#362, salary{f}#363]]
-     *  \_TopN[[Order[first_name{f}#362,ASC,LAST]],5[INTEGER]]
-     *    \_TopN[[Order[salary{f}#363,ASC,LAST]],5[INTEGER]]
-     *      \_MvExpand[first_name{f}#362]
-     *        \_TopN[[Order[emp_no{f}#361,ASC,LAST]],10000[INTEGER]]
-     *          \_EsRelation[employees][emp_no{f}#361, first_name{f}#362, salary{f}#363]
+     * EsqlProject[[emp_no{f}#2560, first_name{r}#2571, salary{f}#2565]]
+     * \_TopN[[Order[first_name{r}#2571,ASC,LAST]],5[INTEGER]]
+     *   \_TopN[[Order[salary{f}#2565,ASC,LAST]],5[INTEGER]]
+     *     \_MvExpand[first_name{f}#2561,first_name{r}#2571,null]
+     *       \_EsRelation[test][_meta_field{f}#2566, emp_no{f}#2560, first_name{f}#..]
      */
     public void testPushDownLimitThroughMultipleSort_AfterMvExpand2() {
         LogicalPlan plan = optimizedPlan("""
@@ -2119,10 +2111,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         assertThat(topN.limit().fold(FoldContext.small()), equalTo(5));
         assertThat(orderNames(topN), contains("salary"));
         var mvExp = as(topN.child(), MvExpand.class);
-        topN = as(mvExp.child(), TopN.class);
-        assertThat(topN.limit().fold(FoldContext.small()), equalTo(10000));
-        assertThat(orderNames(topN), contains("emp_no"));
-        as(topN.child(), EsRelation.class);
+        as(mvExp.child(), EsRelation.class);
     }
 
     /**
@@ -2231,8 +2220,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
      *  \_TopN[[Order[salary{f}#12,ASC,LAST]],5[INTEGER]]
      *    \_Eval[[100[INTEGER] AS b]]
      *      \_MvExpand[first_name{f}#11]
-     *        \_TopN[[Order[first_name{f}#11,ASC,LAST]],10000[INTEGER]]
-     *          \_EsRelation[employees][emp_no{f}#10, first_name{f}#11, salary{f}#12]
+     *        \_EsRelation[employees][emp_no{f}#10, first_name{f}#11, salary{f}#12]
      */
     public void testPushDownLimit_PastEvalAndMvExpand() {
         LogicalPlan plan = optimizedPlan("""
@@ -2250,22 +2238,18 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         assertThat(orderNames(topN), contains("salary"));
         var eval = as(topN.child(), Eval.class);
         var mvExp = as(eval.child(), MvExpand.class);
-        topN = as(mvExp.child(), TopN.class);
-        assertThat(topN.limit().fold(FoldContext.small()), equalTo(10000));
-        assertThat(orderNames(topN), contains("first_name"));
-        as(topN.child(), EsRelation.class);
+        as(mvExp.child(), EsRelation.class);
     }
 
     /**
      * Expected
-     * EsqlProject[[emp_no{f}#12, first_name{r}#22, salary{f}#17]]
-     * \_TopN[[Order[salary{f}#17,ASC,LAST], Order[first_name{r}#22,ASC,LAST]],1000[INTEGER]]
-     *   \_Filter[gender{f}#14 == [46][KEYWORD] AND WILDCARDLIKE(first_name{r}#22)]
-     *     \_MvExpand[first_name{f}#13,first_name{r}#22,null]
-     *       \_TopN[[Order[emp_no{f}#12,ASC,LAST]],10000[INTEGER]]
-     *         \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..]
-     */
-    public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedField_ResultTruncationDefaultSize() {
+     * EsqlProject[[emp_no{f}#5885, first_name{r}#5896, salary{f}#5890]]
+     * \_TopN[[Order[salary{f}#5890,ASC,LAST], Order[first_name{r}#5896,ASC,LAST]],1000[INTEGER]]
+     *   \_Filter[gender{f}#5887 == [46][KEYWORD] AND WILDCARDLIKE(first_name{r}#5896)]
+     *     \_MvExpand[first_name{f}#5886,first_name{r}#5896,null]
+     *       \_EsRelation[test][_meta_field{f}#5891, emp_no{f}#5885, first_name{f}#..]
+     */
+    public void testRedundantSort_BeforeMvExpand_WithFilterOnExpandedField_ResultTruncationDefaultSize() {
         LogicalPlan plan = optimizedPlan("""
             from test
             | sort emp_no
@@ -2282,9 +2266,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         var filter = as(topN.child(), Filter.class);
         assertThat(filter.condition(), instanceOf(And.class));
         var mvExp = as(filter.child(), MvExpand.class);
-        topN = as(mvExp.child(), TopN.class); // TODO is it correct? Double-check AddDefaultTopN rule
-        assertThat(orderNames(topN), contains("emp_no"));
-        as(topN.child(), EsRelation.class);
+        as(mvExp.child(), EsRelation.class);
     }
 
     /**
@@ -2367,8 +2349,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         var mvExpand = as(topN.child(), MvExpand.class);
         var filter = as(mvExpand.child(), Filter.class);
         mvExpand = as(filter.child(), MvExpand.class);
-        var topN2 = as(mvExpand.child(), TopN.class);  // TODO is it correct? Double-check AddDefaultTopN rule
-        as(topN2.child(), EsRelation.class);
+        as(mvExpand.child(), EsRelation.class);
     }
 
     /**
@@ -2463,20 +2444,18 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         assertThat(orderNames(topN), contains("first_name"));
         assertThat(topN.limit().fold(FoldContext.small()), equalTo(10000));
         var mvExpand = as(topN.child(), MvExpand.class);
-        var topN2 = as(mvExpand.child(), TopN.class); // TODO is it correct? Double-check AddDefaultTopN rule
-        as(topN2.child(), EsRelation.class);
+        as(mvExpand.child(), EsRelation.class);
     }
 
     /**
      * Expected
-     * EsqlProject[[emp_no{f}#104, first_name{f}#105, salary{f}#106]]
-     *  \_TopN[[Order[salary{f}#106,ASC,LAST], Order[first_name{f}#105,ASC,LAST]],15[INTEGER]]
-     *    \_Filter[gender{f}#215 == [46][KEYWORD] AND WILDCARDLIKE(first_name{f}#105)]
-     *      \_MvExpand[first_name{f}#105]
-     *        \_TopN[[Order[emp_no{f}#104,ASC,LAST]],10000[INTEGER]]
-     *          \_EsRelation[employees][emp_no{f}#104, first_name{f}#105, salary{f}#106]
-     */
-    public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedField() {
+     * EsqlProject[[emp_no{f}#3517, first_name{r}#3528, salary{f}#3522]]
+     * \_TopN[[Order[salary{f}#3522,ASC,LAST], Order[first_name{r}#3528,ASC,LAST]],15[INTEGER]]
+     *   \_Filter[gender{f}#3519 == [46][KEYWORD] AND WILDCARDLIKE(first_name{r}#3528)]
+     *     \_MvExpand[first_name{f}#3518,first_name{r}#3528,null]
+     *       \_EsRelation[test][_meta_field{f}#3523, emp_no{f}#3517, first_name{f}#..]
+     */
+    public void testRedundantSort_BeforeMvExpand_WithFilterOnExpandedField() {
         LogicalPlan plan = optimizedPlan("""
             from test
             | sort emp_no
@@ -2494,24 +2473,18 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         var filter = as(topN.child(), Filter.class);
         assertThat(filter.condition(), instanceOf(And.class));
         var mvExp = as(filter.child(), MvExpand.class);
-        topN = as(mvExp.child(), TopN.class);
-        // the filter acts on first_name (the one used in mv_expand), so the limit 15 is not pushed down past mv_expand
-        // instead the default limit is added
-        assertThat(topN.limit().fold(FoldContext.small()), equalTo(10000));
-        assertThat(orderNames(topN), contains("emp_no"));
-        as(topN.child(), EsRelation.class);
+        as(mvExp.child(), EsRelation.class);
     }
 
     /**
      * Expected
-     * EsqlProject[[emp_no{f}#104, first_name{f}#105, salary{f}#106]]
-     *  \_TopN[[Order[salary{f}#106,ASC,LAST], Order[first_name{f}#105,ASC,LAST]],15[INTEGER]]
-     *    \_Filter[gender{f}#215 == [46][KEYWORD] AND salary{f}#106 > 60000[INTEGER]]
-     *      \_MvExpand[first_name{f}#105]
-     *        \_TopN[[Order[emp_no{f}#104,ASC,LAST]],10000[INTEGER]]
-     *          \_EsRelation[employees][emp_no{f}#104, first_name{f}#105, salary{f}#106]
-     */
-    public void testAddDefaultLimit_BeforeMvExpand_WithFilter_NOT_OnExpandedField() {
+     * EsqlProject[[emp_no{f}#3421, first_name{r}#3432, salary{f}#3426]]
+     * \_TopN[[Order[salary{f}#3426,ASC,LAST], Order[first_name{r}#3432,ASC,LAST]],15[INTEGER]]
+     *   \_Filter[gender{f}#3423 == [46][KEYWORD] AND salary{f}#3426 > 60000[INTEGER]]
+     *     \_MvExpand[first_name{f}#3422,first_name{r}#3432,null]
+     *       \_EsRelation[test][_meta_field{f}#3427, emp_no{f}#3421, first_name{f}#..]
+     */
+    public void testRedundantSort_BeforeMvExpand_WithFilter_NOT_OnExpandedField() {
         LogicalPlan plan = optimizedPlan("""
             from test
             | sort emp_no
@@ -2529,24 +2502,18 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         var filter = as(topN.child(), Filter.class);
         assertThat(filter.condition(), instanceOf(And.class));
         var mvExp = as(filter.child(), MvExpand.class);
-        topN = as(mvExp.child(), TopN.class);
-        // the filters after mv_expand do not act on the expanded field values, as such the limit 15 is the one being pushed down
-        // otherwise that limit wouldn't have pushed down and the default limit was instead being added by default before mv_expanded
-        assertThat(topN.limit().fold(FoldContext.small()), equalTo(10000));
-        assertThat(orderNames(topN), contains("emp_no"));
-        as(topN.child(), EsRelation.class);
+        as(mvExp.child(), EsRelation.class);
     }
 
     /**
      * Expected
-     * EsqlProject[[emp_no{f}#116, first_name{f}#117 AS x, salary{f}#119]]
-     *  \_TopN[[Order[salary{f}#119,ASC,LAST], Order[first_name{f}#117,ASC,LAST]],15[INTEGER]]
-     *    \_Filter[gender{f}#118 == [46][KEYWORD] AND WILDCARDLIKE(first_name{f}#117)]
-     *      \_MvExpand[first_name{f}#117]
-     *        \_TopN[[Order[gender{f}#118,ASC,LAST]],10000[INTEGER]]
-     *          \_EsRelation[employees][emp_no{f}#116, first_name{f}#117, gender{f}#118, sa..]
-     */
-    public void testAddDefaultLimit_BeforeMvExpand_WithFilterOnExpandedFieldAlias() {
+     * EsqlProject[[emp_no{f}#2085, first_name{r}#2096 AS x, salary{f}#2090]]
+     * \_TopN[[Order[salary{f}#2090,ASC,LAST], Order[first_name{r}#2096,ASC,LAST]],15[INTEGER]]
+     *   \_Filter[gender{f}#2087 == [46][KEYWORD] AND WILDCARDLIKE(first_name{r}#2096)]
+     *     \_MvExpand[first_name{f}#2086,first_name{r}#2096,null]
+     *       \_EsRelation[test][_meta_field{f}#2091, emp_no{f}#2085, first_name{f}#..]
+     */
+    public void testRedundantSort_BeforeMvExpand_WithFilterOnExpandedFieldAlias() {
         LogicalPlan plan = optimizedPlan("""
             from test
             | sort gender
@@ -2565,11 +2532,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         var filter = as(topN.child(), Filter.class);
         assertThat(filter.condition(), instanceOf(And.class));
         var mvExp = as(filter.child(), MvExpand.class);
-        topN = as(mvExp.child(), TopN.class);
-        // the filter uses an alias ("x") to the expanded field ("first_name"), so the default limit is used and not the one provided
-        assertThat(topN.limit().fold(FoldContext.small()), equalTo(10000));
-        assertThat(orderNames(topN), contains("gender"));
-        as(topN.child(), EsRelation.class);
+        as(mvExp.child(), EsRelation.class);
     }
 
     /**
@@ -7270,4 +7233,349 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
         assertEquals(new Literal(EMPTY, 2.0, DataType.DOUBLE), ee.value());
         assertEquals(DataType.DOUBLE, ee.dataType());
     }
+
+    /**
+     * TopN[[Order[emp_no{f}#11,ASC,LAST]],1000[INTEGER]]
+     * \_Join[LEFT,[language_code{r}#5],[language_code{r}#5],[language_code{f}#22]]
+     *   |_EsqlProject[[_meta_field{f}#17, emp_no{f}#11, first_name{f}#12, gender{f}#13, hire_date{f}#18, job{f}#19, job.raw{f}#20, l
+     * anguages{f}#14 AS language_code, last_name{f}#15, long_noidx{f}#21, salary{f}#16, foo{r}#7]]
+     *   | \_Eval[[[62 61 72][KEYWORD] AS foo]]
+     *   |   \_Filter[languages{f}#14 > 1[INTEGER]]
+     *   |     \_EsRelation[test][_meta_field{f}#17, emp_no{f}#11, first_name{f}#12, ..]
+     *   \_EsRelation[languages_lookup][LOOKUP][language_code{f}#22, language_name{f}#23]
+     */
+    public void testRedundantSortOnJoin() {
+        assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled());
+
+        var plan = optimizedPlan("""
+              FROM test
+            | SORT languages
+            | RENAME languages AS language_code
+            | EVAL foo = "bar"
+            | LOOKUP JOIN languages_lookup ON language_code
+            | WHERE language_code > 1
+            | SORT emp_no
+            """);
+
+        var topN = as(plan, TopN.class);
+        var join = as(topN.child(), Join.class);
+        var project = as(join.left(), EsqlProject.class);
+        var eval = as(project.child(), Eval.class);
+        var filter = as(eval.child(), Filter.class);
+        as(filter.child(), EsRelation.class);
+    }
+
+    /**
+     * TopN[[Order[emp_no{f}#9,ASC,LAST]],1000[INTEGER]]
+     * \_Filter[emp_no{f}#9 > 1[INTEGER]]
+     *   \_MvExpand[languages{f}#12,languages{r}#20,null]
+     *     \_Eval[[[62 61 72][KEYWORD] AS foo]]
+     *       \_EsRelation[test][_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..]
+     */
+    public void testRedundantSortOnMvExpand() {
+        var plan = optimizedPlan("""
+              FROM test
+            | SORT languages
+            | EVAL foo = "bar"
+            | MV_EXPAND languages
+            | WHERE emp_no > 1
+            | SORT emp_no
+            """);
+
+        var topN = as(plan, TopN.class);
+        var filter = as(topN.child(), Filter.class);
+        var mvExpand = as(filter.child(), MvExpand.class);
+        var eval = as(mvExpand.child(), Eval.class);
+        as(eval.child(), EsRelation.class);
+    }
+
+    /**
+     * TopN[[Order[emp_no{f}#11,ASC,LAST]],1000[INTEGER]]
+     * \_Join[LEFT,[language_code{r}#5],[language_code{r}#5],[language_code{f}#22]]
+     *   |_Filter[emp_no{f}#11 > 1[INTEGER]]
+     *   | \_MvExpand[languages{f}#14,languages{r}#24,null]
+     *   |   \_Eval[[languages{f}#14 AS language_code]]
+     *   |     \_EsRelation[test][_meta_field{f}#17, emp_no{f}#11, first_name{f}#12, ..]
+     *   \_EsRelation[languages_lookup][LOOKUP][language_code{f}#22, language_name{f}#23]
+     */
+    public void testRedundantSortOnMvExpandAndJoin() {
+        var plan = optimizedPlan("""
+              FROM test
+            | SORT languages
+            | EVAL language_code = languages
+            | MV_EXPAND languages
+            | WHERE emp_no > 1
+            | LOOKUP JOIN languages_lookup ON language_code
+            | SORT emp_no
+            """);
+
+        var topN = as(plan, TopN.class);
+        var join = as(topN.child(), Join.class);
+        var filter = as(join.left(), Filter.class);
+        var mvExpand = as(filter.child(), MvExpand.class);
+        var eval = as(mvExpand.child(), Eval.class);
+        as(eval.child(), EsRelation.class);
+    }
+
+    /**
+     * TopN[[Order[emp_no{f}#12,ASC,LAST]],1000[INTEGER]]
+     * \_Join[LEFT,[language_code{r}#5],[language_code{r}#5],[language_code{f}#23]]
+     *   |_Filter[emp_no{f}#12 > 1[INTEGER]]
+     *   | \_MvExpand[languages{f}#15,languages{r}#25,null]
+     *   |   \_Eval[[languages{f}#15 AS language_code]]
+     *   |     \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..]
+     *   \_EsRelation[languages_lookup][LOOKUP][language_code{f}#23, language_name{f}#24]
+     */
+    public void testMultlipleRedundantSortOnMvExpandAndJoin() {
+        var plan = optimizedPlan("""
+              FROM test
+            | SORT first_name
+            | EVAL language_code = languages
+            | MV_EXPAND languages
+            | sort last_name
+            | WHERE emp_no > 1
+            | LOOKUP JOIN languages_lookup ON language_code
+            | SORT emp_no
+            """);
+
+        var topN = as(plan, TopN.class);
+        var join = as(topN.child(), Join.class);
+        var filter = as(join.left(), Filter.class);
+        var mvExpand = as(filter.child(), MvExpand.class);
+        var eval = as(mvExpand.child(), Eval.class);
+        as(eval.child(), EsRelation.class);
+    }
+
+    /**
+     * TopN[[Order[emp_no{f}#16,ASC,LAST]],1000[INTEGER]]
+     * \_Filter[emp_no{f}#16 > 1[INTEGER]]
+     *   \_MvExpand[languages{f}#19,languages{r}#31]
+     *     \_Dissect[foo{r}#5,Parser[pattern=%{z}, appendSeparator=, parser=org.elasticsearch.dissect.DissectParser@26f2cab],[z{r}#10
+     * ]]
+     *       \_Grok[foo{r}#5,Parser[pattern=%{WORD:y}, grok=org.elasticsearch.grok.Grok@6ea44ccd],[y{r}#9]]
+     *         \_Enrich[ANY,[6c 61 6e 67 75 61 67 65 73 5f 69 64 78][KEYWORD],foo{r}#5,{"match":{"indices":[],"match_field":"id","enrich_
+     * fields":["language_code","language_name"]}},{=languages_idx},[language_code{r}#29, language_name{r}#30]]
+     *           \_Eval[[TOSTRING(languages{f}#19) AS foo]]
+     *             \_EsRelation[test][_meta_field{f}#22, emp_no{f}#16, first_name{f}#17, ..]
+     */
+    public void testRedundantSortOnMvExpandEnrichGrokDissect() {
+        var plan = optimizedPlan("""
+              FROM test
+            | SORT languages
+            | EVAL foo = to_string(languages)
+            | ENRICH languages_idx on foo
+            | GROK foo "%{WORD:y}"
+            | DISSECT foo "%{z}"
+            | MV_EXPAND languages
+            | WHERE emp_no > 1
+            | SORT emp_no
+            """);
+
+        var topN = as(plan, TopN.class);
+        var filter = as(topN.child(), Filter.class);
+        var mvExpand = as(filter.child(), MvExpand.class);
+        var dissect = as(mvExpand.child(), Dissect.class);
+        var grok = as(dissect.child(), Grok.class);
+        var enrich = as(grok.child(), Enrich.class);
+        var eval = as(enrich.child(), Eval.class);
+        as(eval.child(), EsRelation.class);
+    }
+
+    /**
+     * TopN[[Order[emp_no{f}#20,ASC,LAST]],1000[INTEGER]]
+     * \_Filter[emp_no{f}#20 > 1[INTEGER]]
+     *   \_MvExpand[languages{f}#23,languages{r}#37]
+     *     \_Dissect[foo{r}#5,Parser[pattern=%{z}, appendSeparator=, parser=org.elasticsearch.dissect.DissectParser@3e922db0],[z{r}#1
+     * 4]]
+     *       \_Grok[foo{r}#5,Parser[pattern=%{WORD:y}, grok=org.elasticsearch.grok.Grok@4d6ad024],[y{r}#13]]
+     *         \_Enrich[ANY,[6c 61 6e 67 75 61 67 65 73 5f 69 64 78][KEYWORD],foo{r}#5,{"match":{"indices":[],"match_field":"id","enrich_
+     * fields":["language_code","language_name"]}},{=languages_idx},[language_code{r}#35, language_name{r}#36]]
+     *           \_Join[LEFT,[language_code{r}#8],[language_code{r}#8],[language_code{f}#31]]
+     *             |_Eval[[TOSTRING(languages{f}#23) AS foo, languages{f}#23 AS language_code]]
+     *             | \_EsRelation[test][_meta_field{f}#26, emp_no{f}#20, first_name{f}#21, ..]
+     *             \_EsRelation[languages_lookup][LOOKUP][language_code{f}#31]
+     */
+    public void testRedundantSortOnMvExpandJoinEnrichGrokDissect() {
+        var plan = optimizedPlan("""
+              FROM test
+            | SORT languages
+            | EVAL foo = to_string(languages), language_code = languages
+            | LOOKUP JOIN languages_lookup ON language_code
+            | ENRICH languages_idx on foo
+            | GROK foo "%{WORD:y}"
+            | DISSECT foo "%{z}"
+            | MV_EXPAND languages
+            | WHERE emp_no > 1
+            | SORT emp_no
+            """);
+
+        var topN = as(plan, TopN.class);
+        var filter = as(topN.child(), Filter.class);
+        var mvExpand = as(filter.child(), MvExpand.class);
+        var dissect = as(mvExpand.child(), Dissect.class);
+        var grok = as(dissect.child(), Grok.class);
+        var enrich = as(grok.child(), Enrich.class);
+        var join = as(enrich.child(), Join.class);
+        var eval = as(join.left(), Eval.class);
+        as(eval.child(), EsRelation.class);
+    }
+
+    /**
+     * TopN[[Order[emp_no{f}#23,ASC,LAST]],1000[INTEGER]]
+     * \_Filter[emp_no{f}#23 > 1[INTEGER]]
+     *   \_MvExpand[languages{f}#26,languages{r}#36]
+     *     \_EsqlProject[[language_name{f}#35, foo{r}#5 AS bar, languages{f}#26, emp_no{f}#23]]
+     *       \_Join[LEFT,[language_code{r}#8],[language_code{r}#8],[language_code{f}#34]]
+     *         |_Project[[_meta_field{f}#29, emp_no{f}#23, first_name{f}#24, gender{f}#25, hire_date{f}#30, job{f}#31, job.raw{f}#32, l
+     * anguages{f}#26, last_name{f}#27, long_noidx{f}#33, salary{f}#28, foo{r}#5, languages{f}#26 AS language_code]]
+     *         | \_Eval[[TOSTRING(languages{f}#26) AS foo]]
+     *         |   \_EsRelation[test][_meta_field{f}#29, emp_no{f}#23, first_name{f}#24, ..]
+     *         \_EsRelation[languages_lookup][LOOKUP][language_code{f}#34, language_name{f}#35]
+     */
+    public void testRedundantSortOnMvExpandJoinKeepDropRename() {
+        var plan = optimizedPlan("""
+              FROM test
+            | SORT languages
+            | EVAL foo = to_string(languages), language_code = languages
+            | LOOKUP JOIN languages_lookup ON language_code
+            | KEEP language_name, language_code, foo, languages, emp_no
+            | DROP language_code
+            | RENAME foo AS bar
+            | MV_EXPAND languages
+            | WHERE emp_no > 1
+            | SORT emp_no
+            """);
+
+        var topN = as(plan, TopN.class);
+        var filter = as(topN.child(), Filter.class);
+        var mvExpand = as(filter.child(), MvExpand.class);
+        var project = as(mvExpand.child(), Project.class);
+        var join = as(project.child(), Join.class);
+        var project2 = as(join.left(), Project.class);
+        var eval = as(project2.child(), Eval.class);
+        as(eval.child(), EsRelation.class);
+    }
+
+    /**
+     * TopN[[Order[emp_no{f}#15,ASC,LAST]],1000[INTEGER]]
+     * \_Filter[emp_no{f}#15 > 1[INTEGER]]
+     *   \_MvExpand[foo{r}#10,foo{r}#29]
+     *     \_Eval[[CONCAT(language_name{r}#28,[66 6f 6f][KEYWORD]) AS foo]]
+     *       \_MvExpand[language_name{f}#27,language_name{r}#28]
+     *         \_Join[LEFT,[language_code{r}#3],[language_code{r}#3],[language_code{f}#26]]
+     *           |_Eval[[1[INTEGER] AS language_code]]
+     *           | \_EsRelation[test][_meta_field{f}#21, emp_no{f}#15, first_name{f}#16, ..]
+     *           \_EsRelation[languages_lookup][LOOKUP][language_code{f}#26, language_name{f}#27]
+     */
+    public void testEvalLookupMultipleSorts() {
+        var plan = optimizedPlan("""
+              FROM test
+            | EVAL language_code = 1
+            | LOOKUP JOIN languages_lookup ON language_code
+            | SORT language_name
+            | MV_EXPAND language_name
+            | EVAL foo = concat(language_name, "foo")
+            | MV_EXPAND foo
+            | WHERE emp_no > 1
+            | SORT emp_no
+            """);
+
+        var topN = as(plan, TopN.class);
+        var filter = as(topN.child(), Filter.class);
+        var mvExpand = as(filter.child(), MvExpand.class);
+        var eval = as(mvExpand.child(), Eval.class);
+        mvExpand = as(eval.child(), MvExpand.class);
+        var join = as(mvExpand.child(), Join.class);
+        eval = as(join.left(), Eval.class);
+        as(eval.child(), EsRelation.class);
+
+    }
+
+    public void testUnboundedSortSimple() {
+        var query = """
+              ROW x = [1,2,3], y = 1
+              | SORT y
+              | MV_EXPAND x
+              | WHERE x > 2
+            """;
+
+        VerificationException e = expectThrows(VerificationException.class, () -> plan(query));
+        assertThat(e.getMessage(), containsString("line 2:5: Unbounded sort not supported yet [SORT y] please add a limit"));
+    }
+
+    public void testUnboundedSortJoin() {
+        var query = """
+              ROW x = [1,2,3], y = 2, language_code = 1
+              | SORT y
+              | LOOKUP JOIN languages_lookup ON language_code
+              | WHERE language_name == "foo"
+            """;
+
+        VerificationException e = expectThrows(VerificationException.class, () -> plan(query));
+        assertThat(e.getMessage(), containsString("line 2:5: Unbounded sort not supported yet [SORT y] please add a limit"));
+    }
+
+    public void testUnboundedSortWithMvExpandAndFilter() {
+        var query = """
+              FROM test
+            | EVAL language_code = 1
+            | LOOKUP JOIN languages_lookup ON language_code
+            | SORT language_name
+            | EVAL foo = concat(language_name, "foo")
+            | MV_EXPAND foo
+            | WHERE foo == "foo"
+            """;
+
+        VerificationException e = expectThrows(VerificationException.class, () -> plan(query));
+        assertThat(e.getMessage(), containsString("line 4:3: Unbounded sort not supported yet [SORT language_name] please add a limit"));
+    }
+
+    public void testUnboundedSortWithLookupJoinAndFilter() {
+        var query = """
+              FROM test
+            | EVAL language_code = 1
+            | EVAL foo = concat(language_code::string, "foo")
+            | MV_EXPAND foo
+            | SORT foo
+            | LOOKUP JOIN languages_lookup ON language_code
+            | WHERE language_name == "foo"
+            """;
+
+        VerificationException e = expectThrows(VerificationException.class, () -> plan(query));
+        assertThat(e.getMessage(), containsString("line 5:3: Unbounded sort not supported yet [SORT foo] please add a limit"));
+    }
+
+    public void testUnboundedSortExpandFilter() {
+        var query = """
+              ROW x = [1,2,3], y = 1
+              | SORT x
+              | MV_EXPAND x
+              | WHERE x > 2
+            """;
+
+        VerificationException e = expectThrows(VerificationException.class, () -> plan(query));
+        assertThat(e.getMessage(), containsString("line 2:5: Unbounded sort not supported yet [SORT x] please add a limit"));
+    }
+
+    public void testPruneRedundantOrderBy() {
+        var rule = new PruneRedundantOrderBy();
+
+        var query = """
+            row x = [1,2,3], y = 1
+            | sort x
+            | mv_expand x
+            | sort x
+            | mv_expand x
+            | sort y
+            """;
+        LogicalPlan analyzed = analyzer.analyze(parser.createStatement(query));
+        LogicalPlan optimized = rule.apply(analyzed);
+
+        // check that all the redundant SORTs are removed in a single run
+        var limit = as(optimized, Limit.class);
+        var orderBy = as(limit.child(), OrderBy.class);
+        var mvExpand = as(orderBy.child(), MvExpand.class);
+        var mvExpand2 = as(mvExpand.child(), MvExpand.class);
+        as(mvExpand2.child(), Row.class);
+    }
 }

+ 0 - 46
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/OrderExecSerializationTests.java

@@ -1,46 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.esql.plan.physical;
-
-import org.elasticsearch.xpack.esql.core.tree.Source;
-import org.elasticsearch.xpack.esql.expression.Order;
-import org.elasticsearch.xpack.esql.expression.OrderSerializationTests;
-
-import java.io.IOException;
-import java.util.List;
-
-public class OrderExecSerializationTests extends AbstractPhysicalPlanSerializationTests<OrderExec> {
-    public static OrderExec randomOrderExec(int depth) {
-        Source source = randomSource();
-        PhysicalPlan child = randomChild(depth);
-        List<Order> order = randomList(1, 10, OrderSerializationTests::randomOrder);
-        return new OrderExec(source, child, order);
-    }
-
-    @Override
-    protected OrderExec createTestInstance() {
-        return randomOrderExec(0);
-    }
-
-    @Override
-    protected OrderExec mutateInstance(OrderExec instance) throws IOException {
-        PhysicalPlan child = instance.child();
-        List<Order> order = instance.order();
-        if (randomBoolean()) {
-            child = randomValueOtherThan(child, () -> randomChild(0));
-        } else {
-            order = randomValueOtherThan(order, () -> randomList(1, 10, OrderSerializationTests::randomOrder));
-        }
-        return new OrderExec(instance.source(), child, order);
-    }
-
-    @Override
-    protected boolean alwaysEmptySource() {
-        return true;
-    }
-}