Browse Source

ESQL: Fix inconsistent column order in MV_EXPAND (#129745)

The new attribute generated by MV_EXPAND should remain in the original position. The projection added by ProjectAwayColumns does not respect the original order of attributes.

Make ProjectAwayColumns respect the order of attributes to fix this.
kanoshiou 3 months ago
parent
commit
ac0c50820a

+ 6 - 0
docs/changelog/129745.yaml

@@ -0,0 +1,6 @@
+pr: 129745
+summary: "ESQL: Fix `mv_expand` inconsistent column order"
+area: ES|QL
+type: bug
+issues:
+  - 129000

+ 4 - 0
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeSet.java

@@ -261,5 +261,9 @@ public class AttributeSet implements Set<Attribute> {
         public AttributeSet build() {
             return new AttributeSet(mapBuilder.build());
         }
+
+        public void clear() {
+            mapBuilder.keySet().clear();
+        }
     }
 }

+ 0 - 1
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java

@@ -53,7 +53,6 @@ public abstract class GenerativeRestTest extends ESRestTestCase {
         "Data too large", // Circuit breaker exceptions eg. https://github.com/elastic/elasticsearch/issues/130072
 
         // Awaiting fixes for correctness
-        "Expecting the following columns \\[.*\\], got", // https://github.com/elastic/elasticsearch/issues/129000
         "Expecting at most \\[.*\\] columns, got \\[.*\\]" // https://github.com/elastic/elasticsearch/issues/129561
     );
 

+ 62 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec

@@ -419,3 +419,65 @@ emp_no:integer | job_positions:keyword
 10001          | Accountant
 10001          | Senior Python Developer          
 ;
+
+testMvExpandInconsistentColumnOrder1
+required_capability: fix_mv_expand_inconsistent_column_order
+from message_types
+| eval foo_1 = 1, foo_2 = 2 
+| sort message
+| mv_expand foo_1
+;
+
+message:keyword         | type:keyword | foo_1:integer | foo_2:integer
+Connected to 10.1.0.1   | Success      | 1             | 2
+Connected to 10.1.0.2   | Success      | 1             | 2
+Connected to 10.1.0.3   | Success      | 1             | 2
+Connection error        | Error        | 1             | 2
+Development environment | Development  | 1             | 2
+Disconnected            | Disconnected | 1             | 2
+Production environment  | Production   | 1             | 2
+;
+
+testMvExpandInconsistentColumnOrder2
+required_capability: fix_mv_expand_inconsistent_column_order
+from message_types
+| eval foo_1 = [1, 3], foo_2 = 2
+| sort message
+| mv_expand foo_1
+;
+
+message:keyword         | type:keyword | foo_1:integer | foo_2:integer
+Connected to 10.1.0.1   | Success      | 1             | 2
+Connected to 10.1.0.1   | Success      | 3             | 2
+Connected to 10.1.0.2   | Success      | 1             | 2
+Connected to 10.1.0.2   | Success      | 3             | 2
+Connected to 10.1.0.3   | Success      | 1             | 2
+Connected to 10.1.0.3   | Success      | 3             | 2
+Connection error        | Error        | 1             | 2
+Connection error        | Error        | 3             | 2
+Development environment | Development  | 1             | 2
+Development environment | Development  | 3             | 2
+Disconnected            | Disconnected | 1             | 2
+Disconnected            | Disconnected | 3             | 2
+Production environment  | Production   | 1             | 2
+Production environment  | Production   | 3             | 2
+;
+
+testMvExpandInconsistentColumnOrder3
+required_capability: fix_mv_expand_inconsistent_column_order
+from message_types
+| sort type
+| eval language_code = 1, `language_name` = false, message = true, foo_3 = 1, foo_2 = null
+| eval foo_3 = "1", `foo_3` = -1, foo_1 = 1, `language_code` = null, `foo_2` = "1"
+| mv_expand foo_1
+| limit 5
+;
+
+type:keyword | language_name:boolean | message:boolean | foo_3:integer | foo_1:integer | language_code:null | foo_2:keyword
+Development  | false                 | true            | -1            | 1             | null               | 1
+Disconnected | false                 | true            | -1            | 1             | null               | 1
+Error        | false                 | true            | -1            | 1             | null               | 1
+Production   | false                 | true            | -1            | 1             | null               | 1
+Success      | false                 | true            | -1            | 1             | null               | 1
+;
+

+ 6 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -1235,6 +1235,12 @@ public class EsqlCapabilities {
          */
         NO_PLAIN_STRINGS_IN_LITERALS,
 
+        /**
+         * Support for the mv_expand target attribute should be retained in its original position.
+         * see <a href="https://github.com/elastic/elasticsearch/issues/129000"> ES|QL: inconsistent column order #129000 </a>
+         */
+        FIX_MV_EXPAND_INCONSISTENT_COLUMN_ORDER,
+
         /**
          * (Re)Added EXPLAIN command
          */

+ 13 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java

@@ -76,7 +76,19 @@ public class ProjectAwayColumns extends Rule<PhysicalPlan, PhysicalPlan> {
 
                     // no need for projection when dealing with aggs
                     if (logicalFragment instanceof Aggregate == false) {
-                        List<Attribute> output = new ArrayList<>(requiredAttrBuilder.build());
+                        // we should respect the order of the attributes
+                        List<Attribute> output = new ArrayList<>();
+                        for (Attribute attribute : logicalFragment.output()) {
+                            if (requiredAttrBuilder.contains(attribute)) {
+                                output.add(attribute);
+                                requiredAttrBuilder.remove(attribute);
+                            }
+                        }
+                        // requiredAttrBuilder should be empty unless the plan is inconsistent due to a bug.
+                        // This can happen in case of remote ENRICH, see https://github.com/elastic/elasticsearch/issues/118531
+                        // TODO: stop adding the remaining required attributes once remote ENRICH is fixed.
+                        output.addAll(requiredAttrBuilder.build());
+
                         // if all the fields are filtered out, it's only the count that matters
                         // however until a proper fix (see https://github.com/elastic/elasticsearch/issues/98703)
                         // add a synthetic field (so it doesn't clash with the user defined one) to return a constant

+ 259 - 241
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

@@ -128,6 +128,7 @@ import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
 import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
 import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
 import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
+import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
 import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
@@ -193,6 +194,7 @@ import static org.elasticsearch.xpack.esql.planner.mapper.MapperUtils.hasScoreAt
 import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsInRelativeOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
@@ -625,16 +627,16 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
     }
 
     /**
-     * Expected
-     * LimitExec[10000[INTEGER]]
-     * \_AggregateExec[[],[AVG(salary{f}#14) AS x],FINAL]
-     *   \_AggregateExec[[],[AVG(salary{f}#14) AS x],PARTIAL]
-     *     \_FilterExec[ROUND(emp_no{f}#9) > 10[INTEGER]]
-     *       \_TopNExec[[Order[last_name{f}#13,ASC,LAST]],10[INTEGER]]
-     *         \_ExchangeExec[]
-     *           \_ProjectExec[[salary{f}#14, first_name{f}#10, emp_no{f}#9, last_name{f}#13]]     -- project away _doc
-     *             \_FieldExtractExec[salary{f}#14, first_name{f}#10, emp_no{f}#9, last_n..]       -- local field extraction
-     *               \_EsQueryExec[test], query[][_doc{f}#16], limit[10], sort[[last_name]]
+     *LimitExec[10000[INTEGER],8]
+     * \_AggregateExec[[],[SUM(salary{f}#13460,true[BOOLEAN]) AS x#13454],FINAL,[$$x$sum{r}#13466, $$x$seen{r}#13467],8]
+     *   \_AggregateExec[[],[SUM(salary{f}#13460,true[BOOLEAN]) AS x#13454],INITIAL,[$$x$sum{r}#13466, $$x$seen{r}#13467],8]
+     *     \_FilterExec[ROUND(emp_no{f}#13455) > 10[INTEGER]]
+     *       \_TopNExec[[Order[last_name{f}#13459,ASC,LAST]],10[INTEGER],58]
+     *         \_ExchangeExec[[emp_no{f}#13455, last_name{f}#13459, salary{f}#13460],false]
+     *           \_ProjectExec[[emp_no{f}#13455, last_name{f}#13459, salary{f}#13460]]              -- project away _doc
+     *             \_FieldExtractExec[emp_no{f}#13455, last_name{f}#13459, salary{f}#1346..] &lt;[],[]&gt; -- local field extraction
+     *               \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#13482], limit[10],
+     *               sort[[FieldSort[field=last_name{f}#13459, direction=ASC, nulls=LAST]]] estimatedRowSize[74]
      */
     public void testExtractorForField() {
         var plan = physicalPlan("""
@@ -658,7 +660,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         var exchange = asRemoteExchange(topN.child());
         var project = as(exchange.child(), ProjectExec.class);
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("salary", "emp_no", "last_name"));
+        assertThat(names(extract.attributesToExtract()), contains("emp_no", "last_name", "salary"));
         var source = source(extract.child());
         assertThat(source.limit(), is(topN.limit()));
         assertThat(source.sorts(), is(fieldSorts(topN.order())));
@@ -2219,7 +2221,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
      *      ages{f}#6, last_name{f}#7, long_noidx{f}#13, salary{f}#8],false]
      *   \_ProjectExec[[_meta_field{f}#9, emp_no{f}#3, first_name{f}#4, gender{f}#5, hire_date{f}#10, job{f}#11, job.raw{f}#12, langu
      *          ages{f}#6, last_name{f}#7, long_noidx{f}#13, salary{f}#8]]
-     *     \_FieldExtractExec[_meta_field{f}#9, emp_no{f}#3, first_name{f}#4, gen..]<[],[]>
+     *     \_FieldExtractExec[_meta_field{f}#9, emp_no{f}#3, first_name{f}#4, gen..]&lt;[],[]&gt;
      *       \_EsQueryExec[test], indexMode[standard], query[{"esql_single_value":{"field":"first_name","next":{"regexp":{"first_name":
      *       {"value":"foo*","flags_value":65791,"case_insensitive":true,"max_determinized_states":10000,"boost":0.0}}},
      *       "source":"TO_LOWER(first_name) RLIKE \"foo*\"@2:9"}}][_doc{f}#25], limit[1000], sort[] estimatedRowSize[332]
@@ -2340,10 +2342,10 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
      * uages{f}#7, last_name{f}#8, long_noidx{f}#14, salary{f}#9],false]
      *   \_ProjectExec[[_meta_field{f}#10, emp_no{f}#4, first_name{f}#5, gender{f}#6, hire_date{f}#11, job{f}#12, job.raw{f}#13, lang
      * uages{f}#7, last_name{f}#8, long_noidx{f}#14, salary{f}#9]]
-     *     \_FieldExtractExec[_meta_field{f}#10, gender{f}#6, hire_date{f}#11, jo..]<[],[]>
+     *     \_FieldExtractExec[_meta_field{f}#10, gender{f}#6, hire_date{f}#11, jo..]&lt;[],[]&gt;
      *       \_LimitExec[1000[INTEGER]]
      *         \_FilterExec[LIKE(first_name{f}#5, "FOO*", true) OR IN(1[INTEGER],2[INTEGER],3[INTEGER],emp_no{f}#4 + 1[INTEGER])]
-     *           \_FieldExtractExec[first_name{f}#5, emp_no{f}#4]<[],[]>
+     *           \_FieldExtractExec[first_name{f}#5, emp_no{f}#4]&lt;[],[]&gt;
      *             \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#26], limit[], sort[] estimatedRowSize[332]
      */
     public void testChangeCaseAsInsensitiveWildcardLikeNotPushedDown() {
@@ -2458,22 +2460,17 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
 
     /**
      * <code>
-     * ProjectExec[[last_name{f}#21 AS name, first_name{f}#18 AS last_name, last_name{f}#21 AS first_name]]
-     * \_TopNExec[[Order[last_name{f}#21,ASC,LAST]],10[INTEGER],0]
-     *   \_ExchangeExec[[last_name{f}#21, first_name{f}#18],false]
-     *     \_ProjectExec[[last_name{f}#21, first_name{f}#18]]
-     *       \_FieldExtractExec[last_name{f}#21, first_name{f}#18][]
-     *         \_EsQueryExec[test], indexMode[standard], query[{
-     *           "bool":{"must":[
-     *             {"esql_single_value":{
-     *               "field":"last_name",
-     *               "next":{"range":{"last_name":{"gt":"B","boost":1.0}}},
-     *               "source":"first_name &gt; \"B\"@3:9"
-     *             }},
-     *             {"exists":{"field":"first_name","boost":1.0}}
-     *           ],"boost":1.0}}][_doc{f}#40], limit[10], sort[[
-     *             FieldSort[field=last_name{f}#21, direction=ASC, nulls=LAST]
-     *           ]] estimatedRowSize[116]
+     * ProjectExec[[last_name{f}#13858 AS name#13841, first_name{f}#13855 AS last_name#13844, last_name{f}#13858 AS first_name#13
+     * 847]]
+     * \_TopNExec[[Order[last_name{f}#13858,ASC,LAST]],10[INTEGER],100]
+     *   \_ExchangeExec[[first_name{f}#13855, last_name{f}#13858],false]
+     *     \_ProjectExec[[first_name{f}#13855, last_name{f}#13858]]
+     *       \_FieldExtractExec[first_name{f}#13855, last_name{f}#13858]&lt;[],[]&gt;
+     *         \_EsQueryExec[test], indexMode[standard], query[
+     *         {"bool":{"must":[{"esql_single_value":{"field":"last_name","next":
+     *         {"range":{"last_name":{"gt":"B","boost":0.0}}},"source":"first_name > \"B\"@3:9"}},
+     *         {"exists":{"field":"first_name","boost":0.0}}],"boost":1.0}}
+     *         ][_doc{f}#13879], limit[10], sort[[FieldSort[field=last_name{f}#13858, direction=ASC, nulls=LAST]]] estimatedRowSize[116]
      * </code>
      */
     public void testPushDownEvalSwapFilter() {
@@ -2494,7 +2491,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         var extract = as(project.child(), FieldExtractExec.class);
         assertThat(
             extract.attributesToExtract().stream().map(Attribute::name).collect(Collectors.toList()),
-            contains("last_name", "first_name")
+            contains("first_name", "last_name")
         );
 
         // Now verify the correct Lucene push-down of both the filter and the sort
@@ -2607,7 +2604,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
      * uages{f}#7, last_name{f}#8, long_noidx{f}#14, salary{f}#9, _index{m}#2],false]
      *   \_ProjectExec[[_meta_field{f}#10, emp_no{f}#4, first_name{f}#5, gender{f}#6, hire_date{f}#11, job{f}#12, job.raw{f}#13, lang
      * uages{f}#7, last_name{f}#8, long_noidx{f}#14, salary{f}#9, _index{m}#2]]
-     *     \_FieldExtractExec[_meta_field{f}#10, emp_no{f}#4, first_name{f}#5, ge..]<[],[]>
+     *     \_FieldExtractExec[_meta_field{f}#10, emp_no{f}#4, first_name{f}#5, ge..]&lt;[],[]&gt;
      *       \_EsQueryExec[test], indexMode[standard], query[{"wildcard":{"_index":{"wildcard":"test*","boost":0.0}}}][_doc{f}#27],
      *          limit[1000], sort[] estimatedRowSize[382]
      *
@@ -3176,6 +3173,56 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         assertThat(Expressions.names(esQuery.attrs()), contains("_doc"));
     }
 
+    /**
+     * LimitExec[1000[INTEGER],336]
+     * \_MvExpandExec[foo_1{r}#4236,foo_1{r}#4253]
+     *   \_TopNExec[[Order[emp_no{f}#4242,ASC,LAST]],1000[INTEGER],336]
+     *     \_ExchangeExec[[_meta_field{f}#4248, emp_no{f}#4242, first_name{f}#4243, gender{f}#4244, hire_date{f}#4249, job{f}#4250, job.
+     * raw{f}#4251, languages{f}#4245, last_name{f}#4246, long_noidx{f}#4252, salary{f}#4247, foo_1{r}#4236, foo_2{r}#4238],
+     * false]
+     *       \_ProjectExec[[_meta_field{f}#4248, emp_no{f}#4242, first_name{f}#4243, gender{f}#4244, hire_date{f}#4249, job{f}#4250, job.
+     * raw{f}#4251, languages{f}#4245, last_name{f}#4246, long_noidx{f}#4252, salary{f}#4247, foo_1{r}#4236, foo_2{r}#4238]]
+     *         \_FieldExtractExec[_meta_field{f}#4248, emp_no{f}#4242, first_name{f}#..]&lt;[],[]&gt;
+     *           \_EvalExec[[1[INTEGER] AS foo_1#4236, 1[INTEGER] AS foo_2#4238]]
+     *             \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#4268], limit[1000], sort[[FieldSort[field=emp_no{f}#4242,
+     *             direction=ASC, nulls=LAST]]] estimatedRowSize[352]
+     */
+    public void testProjectAwayMvExpandColumnOrder() {
+        var plan = optimizedPlan(physicalPlan("""
+            from test
+            | eval foo_1 = 1, foo_2 = 1
+            | sort emp_no
+            | mv_expand foo_1
+            """));
+        var limit = as(plan, LimitExec.class);
+        var mvExpand = as(limit.child(), MvExpandExec.class);
+        var topN = as(mvExpand.child(), TopNExec.class);
+        var exchange = as(topN.child(), ExchangeExec.class);
+        var project = as(exchange.child(), ProjectExec.class);
+
+        assertThat(
+            Expressions.names(project.projections()),
+            containsInRelativeOrder(
+                "_meta_field",
+                "emp_no",
+                "first_name",
+                "gender",
+                "hire_date",
+                "job",
+                "job.raw",
+                "languages",
+                "last_name",
+                "long_noidx",
+                "salary",
+                "foo_1",
+                "foo_2"
+            )
+        );
+        var fieldExtract = as(project.child(), FieldExtractExec.class);
+        var eval = as(fieldExtract.child(), EvalExec.class);
+        EsQueryExec esQuery = as(eval.child(), EsQueryExec.class);
+    }
+
     /**
      * ProjectExec[[a{r}#5]]
      * \_EvalExec[[__a_SUM@81823521{r}#15 / __a_COUNT@31645621{r}#16 AS a]]
@@ -5674,16 +5721,15 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
     }
 
     /**
-     * ProjectExec[[abbrev{f}#12321, name{f}#12322, location{f}#12325, country{f}#12326, city{f}#12327]]
-     * \_TopNExec[[Order[abbrev{f}#12321,ASC,LAST]],5[INTEGER],0]
-     *   \_ExchangeExec[[abbrev{f}#12321, name{f}#12322, location{f}#12325, country{f}#12326, city{f}#12327],false]
-     *     \_ProjectExec[[abbrev{f}#12321, name{f}#12322, location{f}#12325, country{f}#12326, city{f}#12327]]
-     *       \_FieldExtractExec[abbrev{f}#12321, name{f}#12322, location{f}#12325, ..][]
+     * ProjectExec[[abbrev{f}#4474, name{f}#4475, location{f}#4478, country{f}#4479, city{f}#4480]]
+     * \_TopNExec[[Order[abbrev{f}#4474,ASC,LAST]],5[INTEGER],221]
+     *   \_ExchangeExec[[abbrev{f}#4474, city{f}#4480, country{f}#4479, location{f}#4478, name{f}#4475],false]
+     *     \_ProjectExec[[abbrev{f}#4474, city{f}#4480, country{f}#4479, location{f}#4478, name{f}#4475]]
+     *       \_FieldExtractExec[abbrev{f}#4474, city{f}#4480, country{f}#4479, loca..]&lt;[],[]&gt;
      *         \_EsQueryExec[airports],
-     *           indexMode[standard],
-     *           query[][_doc{f}#12337],
-     *           limit[5],
-     *           sort[[FieldSort[field=abbrev{f}#12321, direction=ASC, nulls=LAST]]] estimatedRowSize[237]
+     *         indexMode[standard],
+     *         query[][_doc{f}#4490],
+     *         limit[5], sort[[FieldSort[field=abbrev{f}#4474, direction=ASC, nulls=LAST]]] estimatedRowSize[237]
      */
     public void testPushTopNKeywordToSource() {
         var optimized = optimizedPlan(physicalPlan("""
@@ -5698,9 +5744,9 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         var exchange = asRemoteExchange(topN.child());
 
         project = as(exchange.child(), ProjectExec.class);
-        assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city"));
+        assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name"));
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "location", "country", "city"));
+        assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "location", "name"));
         var source = source(extract.child());
         assertThat(source.limit(), is(topN.limit()));
         assertThat(source.sorts(), is(fieldSorts(topN.order())));
@@ -5716,13 +5762,13 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
 
     /**
      * <code>
-     * ProjectExec[[abbrev{f}#12, name{f}#13, location{f}#16, country{f}#17, city{f}#18, abbrev{f}#12 AS code]]
-     * \_TopNExec[[Order[abbrev{f}#12,ASC,LAST]],5[INTEGER],0]
-     *   \_ExchangeExec[[abbrev{f}#12, name{f}#13, location{f}#16, country{f}#17, city{f}#18],false]
-     *     \_ProjectExec[[abbrev{f}#12, name{f}#13, location{f}#16, country{f}#17, city{f}#18]]
-     *       \_FieldExtractExec[abbrev{f}#12, name{f}#13, location{f}#16, country{f..][]
-     *         \_EsQueryExec[airports], indexMode[standard], query[][_doc{f}#29], limit[5],
-     *             sort[[FieldSort[field=abbrev{f}#12, direction=ASC, nulls=LAST]]] estimatedRowSize[237]
+     * ProjectExec[[abbrev{f}#7828, name{f}#7829, location{f}#7832, country{f}#7833, city{f}#7834, abbrev{f}#7828 AS code#7820]]
+     * \_TopNExec[[Order[abbrev{f}#7828,ASC,LAST]],5[INTEGER],221]
+     *   \_ExchangeExec[[abbrev{f}#7828, city{f}#7834, country{f}#7833, location{f}#7832, name{f}#7829],false]
+     *     \_ProjectExec[[abbrev{f}#7828, city{f}#7834, country{f}#7833, location{f}#7832, name{f}#7829]]
+     *       \_FieldExtractExec[abbrev{f}#7828, city{f}#7834, country{f}#7833, loca..]&lt;[],[]&gt;
+     *         \_EsQueryExec[airports], indexMode[standard], query[][_doc{f}#7845], limit[5],
+     *         sort[[FieldSort[field=abbrev{f}#7828, direction=ASC, nulls=LAST]]] estimatedRowSize[237]
      * </code>
      */
     public void testPushTopNAliasedKeywordToSource() {
@@ -5740,9 +5786,9 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         var exchange = asRemoteExchange(topN.child());
 
         project = as(exchange.child(), ProjectExec.class);
-        assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city"));
+        assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name"));
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "location", "country", "city"));
+        assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "location", "name"));
         var source = source(extract.child());
         assertThat(source.limit(), is(topN.limit()));
         assertThat(source.sorts(), is(fieldSorts(topN.order())));
@@ -5757,19 +5803,19 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
     }
 
     /**
-     * ProjectExec[[abbrev{f}#11, name{f}#12, location{f}#15, country{f}#16, city{f}#17]]
-     * \_TopNExec[[Order[distance{r}#4,ASC,LAST]],5[INTEGER],0]
-     *   \_ExchangeExec[[abbrev{f}#11, name{f}#12, location{f}#15, country{f}#16, city{f}#17, distance{r}#4],false]
-     *     \_ProjectExec[[abbrev{f}#11, name{f}#12, location{f}#15, country{f}#16, city{f}#17, distance{r}#4]]
-     *       \_FieldExtractExec[abbrev{f}#11, name{f}#12, country{f}#16, city{f}#17][]
-     *         \_EvalExec[[STDISTANCE(location{f}#15,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT])
-     *             AS distance]]
-     *           \_FieldExtractExec[location{f}#15][]
+     * ProjectExec[[abbrev{f}#7283, name{f}#7284, location{f}#7287, country{f}#7288, city{f}#7289]]
+     * \_TopNExec[[Order[distance{r}#7276,ASC,LAST]],5[INTEGER],229]
+     *   \_ExchangeExec[[abbrev{f}#7283, city{f}#7289, country{f}#7288, location{f}#7287, name{f}#7284, distance{r}#7276],false]
+     *     \_ProjectExec[[abbrev{f}#7283, city{f}#7289, country{f}#7288, location{f}#7287, name{f}#7284, distance{r}#7276]]
+     *       \_FieldExtractExec[abbrev{f}#7283, city{f}#7289, country{f}#7288, name..]&lt;[],[]&gt;
+     *         \_EvalExec[[STDISTANCE(location{f}#7287,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS distan
+     * ce#7276]]
+     *           \_FieldExtractExec[location{f}#7287]&lt;[],[]&gt;
      *             \_EsQueryExec[airports],
-     *               indexMode[standard],
-     *               query[][_doc{f}#28],
-     *               limit[5],
-     *               sort[[GeoDistanceSort[field=location{f}#15, direction=ASC, lat=55.673, lon=12.565]]] estimatedRowSize[245]
+     *             indexMode[standard],
+     *             query[][_doc{f}#7300],
+     *             limit[5],
+     *             sort[[GeoDistanceSort[field=location{f}#7287, direction=ASC, lat=55.673, lon=12.565]]] estimatedRowSize[245]
      */
     public void testPushTopNDistanceToSource() {
         var optimized = optimizedPlan(physicalPlan("""
@@ -5785,9 +5831,9 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         var exchange = asRemoteExchange(topN.child());
 
         project = as(exchange.child(), ProjectExec.class);
-        assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "distance"));
+        assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "distance"));
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
+        assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
         var evalExec = as(extract.child(), EvalExec.class);
         var alias = as(evalExec.fields().get(0), Alias.class);
         assertThat(alias.name(), is("distance"));
@@ -5814,20 +5860,19 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
     }
 
     /**
-     * ProjectExec[[abbrev{f}#8, name{f}#9, location{f}#12, country{f}#13, city{f}#14]]
-     * \_TopNExec[[Order[$$order_by$0$0{r}#16,ASC,LAST]],5[INTEGER],0]
-     *   \_ExchangeExec[[abbrev{f}#8, name{f}#9, location{f}#12, country{f}#13, city{f}#14, $$order_by$0$0{r}#16],false]
-     *     \_ProjectExec[[abbrev{f}#8, name{f}#9, location{f}#12, country{f}#13, city{f}#14, $$order_by$0$0{r}#16]]
-     *       \_FieldExtractExec[abbrev{f}#8, name{f}#9, country{f}#13, city{f}#14][]
-     *         \_EvalExec[[
-     *             STDISTANCE(location{f}#12,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS $$order_by$0$0
-     *           ]]
-     *           \_FieldExtractExec[location{f}#12][]
+     *ProjectExec[[abbrev{f}#5258, name{f}#5259, location{f}#5262, country{f}#5263, city{f}#5264]]
+     * \_TopNExec[[Order[$$order_by$0$0{r}#5266,ASC,LAST]],5[INTEGER],229]
+     *   \_ExchangeExec[[abbrev{f}#5258, city{f}#5264, country{f}#5263, location{f}#5262, name{f}#5259, $$order_by$0$0{r}#5266],false]
+     *     \_ProjectExec[[abbrev{f}#5258, city{f}#5264, country{f}#5263, location{f}#5262, name{f}#5259, $$order_by$0$0{r}#5266]]
+     *       \_FieldExtractExec[abbrev{f}#5258, city{f}#5264, country{f}#5263, name..]&lt;[],[]&gt;
+     *         \_EvalExec[[STDISTANCE(location{f}#5262,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS $$orde
+     * r_by$0$0#5266]]
+     *           \_FieldExtractExec[location{f}#5262]&lt;[],[]&gt;
      *             \_EsQueryExec[airports],
-     *               indexMode[standard],
-     *               query[][_doc{f}#26],
-     *               limit[5],
-     *               sort[[GeoDistanceSort[field=location{f}#12, direction=ASC, lat=55.673, lon=12.565]]] estimatedRowSize[245]
+     *             indexMode[standard],
+     *             query[][_doc{f}#5276],
+     *             limit[5],
+     *             sort[[GeoDistanceSort[field=location{f}#5262, direction=ASC, lat=55.673, lon=12.565]]] estimatedRowSize[245]
      */
     public void testPushTopNInlineDistanceToSource() {
         var optimized = optimizedPlan(physicalPlan("""
@@ -5847,15 +5892,15 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             names(project.projections()),
             contains(
                 equalTo("abbrev"),
-                equalTo("name"),
-                equalTo("location"),
-                equalTo("country"),
                 equalTo("city"),
+                equalTo("country"),
+                equalTo("location"),
+                equalTo("name"),
                 startsWith("$$order_by$0$")
             )
         );
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
+        assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
         var evalExec = as(extract.child(), EvalExec.class);
         var alias = as(evalExec.fields().get(0), Alias.class);
         assertThat(alias.name(), startsWith("$$order_by$0$"));
@@ -5884,14 +5929,14 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
 
     /**
      * <code>
-     * ProjectExec[[abbrev{f}#12, name{f}#13, location{f}#16, country{f}#17, city{f}#18]]
-     * \_TopNExec[[Order[distance{r}#4,ASC,LAST]],5[INTEGER],0]
-     *   \_ExchangeExec[[abbrev{f}#12, name{f}#13, location{f}#16, country{f}#17, city{f}#18, distance{r}#4],false]
-     *     \_ProjectExec[[abbrev{f}#12, name{f}#13, location{f}#16, country{f}#17, city{f}#18, distance{r}#4]]
-     *       \_FieldExtractExec[abbrev{f}#12, name{f}#13, country{f}#17, city{f}#18][]
-     *         \_EvalExec[[STDISTANCE(location{f}#16,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS distance
-     * ]]
-     *           \_FieldExtractExec[location{f}#16][]
+     * ProjectExec[[abbrev{f}#361, name{f}#362, location{f}#365, country{f}#366, city{f}#367]]
+     * \_TopNExec[[Order[distance{r}#353,ASC,LAST]],5[INTEGER],229]
+     *   \_ExchangeExec[[abbrev{f}#361, city{f}#367, country{f}#366, location{f}#365, name{f}#362, distance{r}#353],false]
+     *     \_ProjectExec[[abbrev{f}#361, city{f}#367, country{f}#366, location{f}#365, name{f}#362, distance{r}#353]]
+     *       \_FieldExtractExec[abbrev{f}#361, city{f}#367, country{f}#366, name{f}..]&lt;[],[]&gt;
+     *         \_EvalExec[[STDISTANCE(location{f}#365,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS distanc
+     * e#353]]
+     *           \_FieldExtractExec[location{f}#365]&lt;[],[]&gt;
      *             \_EsQueryExec[airports], indexMode[standard], query[
      * {
      *   "geo_shape":{
@@ -5904,7 +5949,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
      *       }
      *     }
      *   }
-     * }][_doc{f}#29], limit[5], sort[[GeoDistanceSort[field=location{f}#16, direction=ASC, lat=55.673, lon=12.565]]] estimatedRowSize[245]
+     * ][_doc{f}#378], limit[5], sort[[GeoDistanceSort[field=location{f}#365, direction=ASC, lat=55.673, lon=12.565]]] estimatedRowSize[245]
      * </code>
      */
     public void testPushTopNDistanceWithFilterToSource() {
@@ -5922,9 +5967,9 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         var exchange = asRemoteExchange(topN.child());
 
         project = as(exchange.child(), ProjectExec.class);
-        assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "distance"));
+        assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "distance"));
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
+        assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
         var evalExec = as(extract.child(), EvalExec.class);
         var alias = as(evalExec.fields().get(0), Alias.class);
         assertThat(alias.name(), is("distance"));
@@ -5960,48 +6005,25 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
 
     /**
      * <code>
-     * ProjectExec[[abbrev{f}#14, name{f}#15, location{f}#18, country{f}#19, city{f}#20]]
-     * \_TopNExec[[Order[distance{r}#4,ASC,LAST]],5[INTEGER],0]
-     *   \_ExchangeExec[[abbrev{f}#14, name{f}#15, location{f}#18, country{f}#19, city{f}#20, distance{r}#4],false]
-     *     \_ProjectExec[[abbrev{f}#14, name{f}#15, location{f}#18, country{f}#19, city{f}#20, distance{r}#4]]
-     *       \_FieldExtractExec[abbrev{f}#14, name{f}#15, country{f}#19, city{f}#20][]
-     *         \_EvalExec[[STDISTANCE(location{f}#18,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT])
-     *           AS distance]]
-     *           \_FieldExtractExec[location{f}#18][]
-     *             \_EsQueryExec[airports], indexMode[standard], query[{
-     *               "bool":{
-     *                 "filter":[
-     *                   {
-     *                     "esql_single_value":{
-     *                       "field":"scalerank",
-     *                       "next":{"range":{"scalerank":{"lt":6,"boost":1.0}}},
-     *                       "source":"scalerank lt 6@3:31"
-     *                     }
-     *                   },
-     *                   {
-     *                     "bool":{
-     *                       "must":[
-     *                         {"geo_shape":{
-     *                           "location":{
-     *                             "relation":"INTERSECTS",
-     *                             "shape":{"type":"Circle","radius":"499999.99999999994m","coordinates":[12.565,55.673]}
-     *                           }
-     *                         }},
-     *                         {"geo_shape":{
-     *                           "location":{
-     *                             "relation":"DISJOINT",
-     *                             "shape":{"type":"Circle","radius":"10000.000000000002m","coordinates":[12.565,55.673]}
-     *                           }
-     *                         }}
-     *                       ],
-     *                       "boost":1.0
-     *                     }
-     *                   }
-     *                 ],
-     *                 "boost":1.0
-     *               }}][_doc{f}#31], limit[5], sort[[
-     *                 GeoDistanceSort[field=location{f}#18, direction=ASC, lat=55.673, lon=12.565]
-     *               ]] estimatedRowSize[245]
+     * ProjectExec[[abbrev{f}#6367, name{f}#6368, location{f}#6371, country{f}#6372, city{f}#6373]]
+     * \_TopNExec[[Order[distance{r}#6357,ASC,LAST]],5[INTEGER],229]
+     *   \_ExchangeExec[[abbrev{f}#6367, city{f}#6373, country{f}#6372, location{f}#6371, name{f}#6368, distance{r}#6357],false]
+     *     \_ProjectExec[[abbrev{f}#6367, city{f}#6373, country{f}#6372, location{f}#6371, name{f}#6368, distance{r}#6357]]
+     *       \_FieldExtractExec[abbrev{f}#6367, city{f}#6373, country{f}#6372, name..]&lt;[],[]&gt;
+     *         \_EvalExec[[STDISTANCE(location{f}#6371,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS distan
+     * ce#6357]]
+     *           \_FieldExtractExec[location{f}#6371]&lt;[],[]&gt;
+     *             \_EsQueryExec[airports], indexMode[standard], query[
+     *             {"bool":{"filter":[{"esql_single_value":{"field":"scalerank","next":{"range":
+     *             {"scalerank":{"lt":6,"boost":0.0}}},"source":"scalerank &lt; 6@3:31"}},
+     *             {"bool":{"must":[{"geo_shape":
+     *             {"location":{"relation":"INTERSECTS","shape":
+     *             {"type":"Circle","radius":"499999.99999999994m","coordinates":[12.565,55.673]}}}},
+     *             {"geo_shape":{"location":{"relation":"DISJOINT","shape":
+     *             {"type":"Circle","radius":"10000.000000000002m","coordinates":[12.565,55.673]}}}}]
+     *             ,"boost":1.0}}],"boost":1.0}}
+     *             ][_doc{f}#6384], limit[5], sort[
+     *             [GeoDistanceSort[field=location{f}#6371, direction=ASC, lat=55.673, lon=12.565]]] estimatedRowSize[245]
      * </code>
      */
     public void testPushTopNDistanceWithCompoundFilterToSource() {
@@ -6019,9 +6041,9 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         var exchange = asRemoteExchange(topN.child());
 
         project = as(exchange.child(), ProjectExec.class);
-        assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "distance"));
+        assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "distance"));
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
+        assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
         var evalExec = as(extract.child(), EvalExec.class);
         var alias = as(evalExec.fields().get(0), Alias.class);
         assertThat(alias.name(), is("distance"));
@@ -6059,35 +6081,28 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
     /**
      * Tests that multiple sorts, including distance and a field, are pushed down to the source.
      * <code>
-     * ProjectExec[[abbrev{f}#25, name{f}#26, location{f}#29, country{f}#30, city{f}#31, scalerank{f}#27, scale{r}#7]]
-     * \_TopNExec[[
-     *     Order[distance{r}#4,ASC,LAST],
-     *     Order[scalerank{f}#27,ASC,LAST],
-     *     Order[scale{r}#7,DESC,FIRST],
-     *     Order[loc{r}#10,DESC,FIRST]
-     *   ],5[INTEGER],0]
-     *   \_ExchangeExec[[abbrev{f}#25, name{f}#26, location{f}#29, country{f}#30, city{f}#31, scalerank{f}#27, scale{r}#7,
-     *       distance{r}#4, loc{r}#10],false]
-     *     \_ProjectExec[[abbrev{f}#25, name{f}#26, location{f}#29, country{f}#30, city{f}#31, scalerank{f}#27, scale{r}#7,
-     *         distance{r}#4, loc{r}#10]]
-     *       \_FieldExtractExec[abbrev{f}#25, name{f}#26, country{f}#30, city{f}#31][]
-     *         \_EvalExec[[
-     *             STDISTANCE(location{f}#29,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS distance,
-     *             10[INTEGER] - scalerank{f}#27 AS scale, TOSTRING(location{f}#29) AS loc
-     *           ]]
-     *           \_FieldExtractExec[location{f}#29, scalerank{f}#27][]
-     *             \_EsQueryExec[airports], indexMode[standard], query[{
-     *               "bool":{
-     *                 "filter":[
-     *                   {"esql_single_value":{"field":"scalerank","next":{...},"source":"scalerank &lt; 6@3:31"}},
-     *                   {"bool":{
-     *                     "must":[
-     *                       {"geo_shape":{"location":{"relation":"INTERSECTS","shape":{...}}}},
-     *                       {"geo_shape":{"location":{"relation":"DISJOINT","shape":{...}}}}
-     *                     ],"boost":1.0}}],"boost":1.0}}][_doc{f}#44], limit[5], sort[[
-     *                       GeoDistanceSort[field=location{f}#29, direction=ASC, lat=55.673, lon=12.565],
-     *                       FieldSort[field=scalerank{f}#27, direction=ASC, nulls=LAST]
-     *                     ]] estimatedRowSize[303]
+     * ProjectExec[[abbrev{f}#7429, name{f}#7430, location{f}#7433, country{f}#7434, city{f}#7435, scalerank{f}#7431, scale{r}#74
+     * 11]]
+     * \_TopNExec[[Order[distance{r}#7408,ASC,LAST], Order[scalerank{f}#7431,ASC,LAST], Order[scale{r}#7411,DESC,FIRST], Order[l
+     * oc{r}#7414,DESC,FIRST]],5[INTEGER],287]
+     *   \_ExchangeExec[[abbrev{f}#7429, city{f}#7435, country{f}#7434, location{f}#7433, name{f}#7430, scalerank{f}#7431, distance{r}
+     * #7408, scale{r}#7411, loc{r}#7414],false]
+     *     \_ProjectExec[[abbrev{f}#7429, city{f}#7435, country{f}#7434, location{f}#7433, name{f}#7430, scalerank{f}#7431, distance{r}
+     * #7408, scale{r}#7411, loc{r}#7414]]
+     *       \_FieldExtractExec[abbrev{f}#7429, city{f}#7435, country{f}#7434, name..]&lt;[],[]&gt;
+     *         \_EvalExec[[STDISTANCE(location{f}#7433,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS distan
+     * ce#7408, 10[INTEGER] - scalerank{f}#7431 AS scale#7411, TOSTRING(location{f}#7433) AS loc#7414]]
+     *           \_FieldExtractExec[location{f}#7433, scalerank{f}#7431]&lt;[],[]&gt;
+     *             \_EsQueryExec[airports], indexMode[standard], query[
+     *             {"bool":{"filter":[{"esql_single_value":{"field":"scalerank","next":
+     *             {"range":{"scalerank":{"lt":6,"boost":0.0}}},"source":"scalerank &lt; 6@3:31"}},
+     *             {"bool":{"must":[{"geo_shape":{"location":{"relation":"INTERSECTS","shape":
+     *             {"type":"Circle","radius":"499999.99999999994m","coordinates":[12.565,55.673]}}}},
+     *             {"geo_shape":{"location":{"relation":"DISJOINT","shape":
+     *             {"type":"Circle","radius":"10000.000000000002m","coordinates":[12.565,55.673]}}}}],
+     *             "boost":1.0}}],"boost":1.0}}][_doc{f}#7448], limit[5], sort[
+     *             [GeoDistanceSort[field=location{f}#7433, direction=ASC, lat=55.673, lon=12.565],
+     *             FieldSort[field=scalerank{f}#7431, direction=ASC, nulls=LAST]]] estimatedRowSize[303]
      * </code>
      */
     public void testPushTopNDistanceAndPushableFieldWithCompoundFilterToSource() {
@@ -6108,10 +6123,10 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         project = as(exchange.child(), ProjectExec.class);
         assertThat(
             names(project.projections()),
-            contains("abbrev", "name", "location", "country", "city", "scalerank", "scale", "distance", "loc")
+            contains("abbrev", "city", "country", "location", "name", "scalerank", "distance", "scale", "loc")
         );
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
+        assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
         var evalExec = as(extract.child(), EvalExec.class);
         var alias = as(evalExec.fields().get(0), Alias.class);
         assertThat(alias.name(), is("distance"));
@@ -6153,26 +6168,30 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
     /**
      * This test shows that if the filter contains a predicate on the same field that is sorted, we cannot push down the sort.
      * <code>
-     * ProjectExec[[abbrev{f}#23, name{f}#24, location{f}#27, country{f}#28, city{f}#29, scalerank{f}#25 AS scale]]
-     * \_TopNExec[[Order[distance{r}#4,ASC,LAST], Order[scalerank{f}#25,ASC,LAST]],5[INTEGER],0]
-     *   \_ExchangeExec[[abbrev{f}#23, name{f}#24, location{f}#27, country{f}#28, city{f}#29, scalerank{f}#25, distance{r}#4],false]
-     *     \_ProjectExec[[abbrev{f}#23, name{f}#24, location{f}#27, country{f}#28, city{f}#29, scalerank{f}#25, distance{r}#4]]
-     *       \_FieldExtractExec[abbrev{f}#23, name{f}#24, country{f}#28, city{f}#29][]
-     *         \_TopNExec[[Order[distance{r}#4,ASC,LAST], Order[scalerank{f}#25,ASC,LAST]],5[INTEGER],208]
-     *           \_FieldExtractExec[scalerank{f}#25][]
-     *             \_FilterExec[SUBSTRING(position{r}#7,1[INTEGER],5[INTEGER]) == [50 4f 49 4e 54][KEYWORD]]
-     *               \_EvalExec[[
-     *                   STDISTANCE(location{f}#27,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS distance,
-     *                   TOSTRING(location{f}#27) AS position
-     *                 ]]
-     *                 \_FieldExtractExec[location{f}#27][]
-     *                   \_EsQueryExec[airports], indexMode[standard], query[{
-     *                     "bool":{"filter":[
-     *                       {"esql_single_value":{"field":"scalerank","next":{"range":{"scalerank":{"lt":6,"boost":1.0}}},"source":...}},
-     *                       {"bool":{"must":[
-     *                         {"geo_shape":{"location":{"relation":"INTERSECTS","shape":{...}}}},
-     *                         {"geo_shape":{"location":{"relation":"DISJOINT","shape":{...}}}}
-     *                       ],"boost":1.0}}],"boost":1.0}}][_doc{f}#42], limit[], sort[] estimatedRowSize[87]
+     * ProjectExec[[abbrev{f}#4856, name{f}#4857, location{f}#4860, country{f}#4861, city{f}#4862, scalerank{f}#4858 AS scale#484
+     * 3]]
+     * \_TopNExec[[Order[distance{r}#4837,ASC,LAST], Order[scalerank{f}#4858,ASC,LAST]],5[INTEGER],233]
+     *   \_ExchangeExec[[abbrev{f}#4856, city{f}#4862, country{f}#4861, location{f}#4860, name{f}#4857, scalerank{f}#4858, distance{r}
+     * #4837],false]
+     *     \_ProjectExec[[abbrev{f}#4856, city{f}#4862, country{f}#4861, location{f}#4860, name{f}#4857, scalerank{f}#4858, distance{r}
+     * #4837]]
+     *       \_FieldExtractExec[abbrev{f}#4856, city{f}#4862, country{f}#4861, name..]&lt;[],[]&gt;
+     *         \_TopNExec[[Order[distance{r}#4837,ASC,LAST], Order[scalerank{f}#4858,ASC,LAST]],5[INTEGER],303]
+     *           \_FieldExtractExec[scalerank{f}#4858]&lt;[],[]&gt;
+     *             \_FilterExec[SUBSTRING(position{r}#4840,1[INTEGER],5[INTEGER]) == POINT[KEYWORD]]
+     *               \_EvalExec[[STDISTANCE(location{f}#4860,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS
+     * distance#4837, TOSTRING(location{f}#4860) AS position#4840]]
+     *                 \_FieldExtractExec[location{f}#4860]&lt;[],[]&gt;
+     *                   \_EsQueryExec[airports], indexMode[standard], query[
+     *                   {"bool":{"filter":[
+     *                   {"esql_single_value":
+     *                   {"field":"scalerank","next":{"range":{"scalerank":{"lt":6,"boost":0.0}}},"source":"scale &lt; 6@3:93"}},
+     *                   {"bool":{"must":[
+     *                   {"geo_shape":{"location":{"relation":"INTERSECTS","shape":
+     *                   {"type":"Circle","radius":"499999.99999999994m","coordinates":[12.565,55.673]}}}},
+     *                   {"geo_shape":{"location":{"relation":"DISJOINT","shape":
+     *                   {"type":"Circle","radius":"10000.000000000002m","coordinates":[12.565,55.673]}}}}
+     *                   ],"boost":1.0}}],"boost":1.0}}][_doc{f}#4875], limit[], sort[] estimatedRowSize[87]
      * </code>
      */
     public void testPushTopNDistanceAndNonPushableEvalWithCompoundFilterToSource() {
@@ -6191,9 +6210,9 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         var exchange = asRemoteExchange(topN.child());
 
         project = as(exchange.child(), ProjectExec.class);
-        assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "scalerank", "distance"));
+        assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "scalerank", "distance"));
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
+        assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
         var topNChild = as(extract.child(), TopNExec.class);
         extract = as(topNChild.child(), FieldExtractExec.class);
         assertThat(names(extract.attributesToExtract()), contains("scalerank"));
@@ -6228,27 +6247,25 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
     /**
      * This test shows that if the filter contains a predicate on the same field that is sorted, we cannot push down the sort.
      * <code>
-     * ProjectExec[[abbrev{f}#23, name{f}#24, location{f}#27, country{f}#28, city{f}#29, scale{r}#10]]
-     * \_TopNExec[[Order[distance{r}#4,ASC,LAST], Order[scale{r}#10,ASC,LAST]],5[INTEGER],0]
-     *   \_ExchangeExec[[abbrev{f}#23, name{f}#24, location{f}#27, country{f}#28, city{f}#29, scale{r}#10, distance{r}#4],false]
-     *     \_ProjectExec[[abbrev{f}#23, name{f}#24, location{f}#27, country{f}#28, city{f}#29, scale{r}#10, distance{r}#4]]
-     *       \_FieldExtractExec[abbrev{f}#23, name{f}#24, country{f}#28, city{f}#29][]
-     *         \_TopNExec[[Order[distance{r}#4,ASC,LAST], Order[scale{r}#10,ASC,LAST]],5[INTEGER],208]
-     *           \_FilterExec[
-     *               SUBSTRING(position{r}#7,1[INTEGER],5[INTEGER]) == [50 4f 49 4e 54][KEYWORD]
-     *               AND scale{r}#10 &gt; 3[INTEGER]
-     *             ]
-     *             \_EvalExec[[
-     *                 STDISTANCE(location{f}#27,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS distance,
-     *                 TOSTRING(location{f}#27) AS position,
-     *                 10[INTEGER] - scalerank{f}#25 AS scale
-     *               ]]
-     *               \_FieldExtractExec[location{f}#27, scalerank{f}#25][]
-     *                 \_EsQueryExec[airports], indexMode[standard], query[{
-     *                   "bool":{"must":[
-     *                     {"geo_shape":{"location":{"relation":"INTERSECTS","shape":{...}}}},
-     *                     {"geo_shape":{"location":{"relation":"DISJOINT","shape":{...}}}}
-     *                   ],"boost":1.0}}][_doc{f}#42], limit[], sort[] estimatedRowSize[91]
+     *ProjectExec[[abbrev{f}#1447, name{f}#1448, location{f}#1451, country{f}#1452, city{f}#1453, scalerank{r}#1434]]
+     * \_TopNExec[[Order[distance{r}#1428,ASC,LAST], Order[scalerank{r}#1434,ASC,LAST]],5[INTEGER],233]
+     *   \_ExchangeExec[[abbrev{f}#1447, city{f}#1453, country{f}#1452, location{f}#1451, name{f}#1448, distance{r}#1428, scalerank{r}
+     * #1434],false]
+     *     \_ProjectExec[[abbrev{f}#1447, city{f}#1453, country{f}#1452, location{f}#1451, name{f}#1448, distance{r}#1428, scalerank{r}
+     * #1434]]
+     *       \_FieldExtractExec[abbrev{f}#1447, city{f}#1453, country{f}#1452, name..]&lt;[],[]&gt;
+     *         \_TopNExec[[Order[distance{r}#1428,ASC,LAST], Order[scalerank{r}#1434,ASC,LAST]],5[INTEGER],303]
+     *           \_FilterExec[SUBSTRING(position{r}#1431,1[INTEGER],5[INTEGER]) == POINT[KEYWORD] AND scalerank{r}#1434 > 3[INTEGER]]
+     *             \_EvalExec[[STDISTANCE(location{f}#1451,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS distan
+     * ce#1428, TOSTRING(location{f}#1451) AS position#1431, 10[INTEGER] - scalerank{f}#1449 AS scalerank#1434]]
+     *               \_FieldExtractExec[location{f}#1451, scalerank{f}#1449]&lt;[],[]&gt;
+     *                 \_EsQueryExec[airports], indexMode[standard], query[
+     *                 {"bool":{"must":[
+     *                 {"geo_shape":{"location":{"relation":"INTERSECTS","shape":
+     *                 {"type":"Circle","radius":"499999.99999999994m","coordinates":[12.565,55.673]}}}},
+     *                 {"geo_shape":{"location":{"relation":"DISJOINT","shape":
+     *                 {"type":"Circle","radius":"10000.000000000002m","coordinates":[12.565,55.673]}}}}
+     *                 ],"boost":1.0}}][_doc{f}#1466], limit[], sort[] estimatedRowSize[91]
      * </code>
      */
     public void testPushTopNDistanceAndNonPushableEvalsWithCompoundFilterToSource() {
@@ -6267,9 +6284,9 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         var exchange = asRemoteExchange(topN.child());
 
         project = as(exchange.child(), ProjectExec.class);
-        assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "scalerank", "distance"));
+        assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "distance", "scalerank"));
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
+        assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
         var topNChild = as(extract.child(), TopNExec.class);
         var filter = as(topNChild.child(), FilterExec.class);
         assertThat(filter.condition(), isA(And.class));
@@ -6344,9 +6361,9 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         var exchange = asRemoteExchange(topN.child());
 
         project = as(exchange.child(), ProjectExec.class);
-        assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "scalerank", "distance"));
+        assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "scalerank", "distance"));
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
+        assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
         var topNChild = as(extract.child(), TopNExec.class);
         var filter = as(topNChild.child(), FilterExec.class);
         assertThat(filter.condition(), isA(Or.class));
@@ -6373,28 +6390,29 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
 
     /**
      * <code>
-     * ProjectExec[[abbrev{f}#15, name{f}#16, location{f}#19, country{f}#20, city{f}#21]]
-     * \_TopNExec[[Order[scalerank{f}#17,ASC,LAST], Order[distance{r}#4,ASC,LAST]],15[INTEGER],0]
-     *   \_ExchangeExec[[abbrev{f}#15, name{f}#16, location{f}#19, country{f}#20, city{f}#21, scalerank{f}#17, distance{r}#4],false]
-     *     \_ProjectExec[[abbrev{f}#15, name{f}#16, location{f}#19, country{f}#20, city{f}#21, scalerank{f}#17, distance{r}#4]]
-     *       \_FieldExtractExec[abbrev{f}#15, name{f}#16, country{f}#20, city{f}#21, ..][]
-     *         \_EvalExec[[STDISTANCE(location{f}#19,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT])
-     *           AS distance]]
-     *           \_FieldExtractExec[location{f}#19][]
-     *             \_EsQueryExec[airports], indexMode[standard], query[{
-     *               "bool":{
-     *                 "filter":[
-     *                   {"esql_single_value":{"field":"scalerank",...,"source":"scalerank lt 6@3:31"}},
-     *                   {"bool":{"must":[
-     *                     {"geo_shape":{"location":{"relation":"INTERSECTS","shape":{...}}}},
-     *                     {"geo_shape":{"location":{"relation":"DISJOINT","shape":{...}}}}
-     *                   ],"boost":1.0}}
-     *                 ],"boost":1.0
-     *               }
-     *             }][_doc{f}#32], limit[], sort[[
-     *               FieldSort[field=scalerank{f}#17, direction=ASC, nulls=LAST],
-     *               GeoDistanceSort[field=location{f}#19, direction=ASC, lat=55.673, lon=12.565]
-     *             ]] estimatedRowSize[37]
+     * ProjectExec[[abbrev{f}#6090, name{f}#6091, location{f}#6094, country{f}#6095, city{f}#6096]]
+     * \_TopNExec[[Order[scalerank{f}#6092,ASC,LAST], Order[distance{r}#6079,ASC,LAST]],15[INTEGER],233]
+     *   \_ExchangeExec[[abbrev{f}#6090, city{f}#6096, country{f}#6095, location{f}#6094, name{f}#6091, scalerank{f}#6092, distance{r}
+     * #6079],false]
+     *     \_ProjectExec[[abbrev{f}#6090, city{f}#6096, country{f}#6095, location{f}#6094, name{f}#6091, scalerank{f}#6092, distance{r}
+     * #6079]]
+     *       \_FieldExtractExec[abbrev{f}#6090, city{f}#6096, country{f}#6095, name..]&lt;[],[]&gt;
+     *         \_EvalExec[[STDISTANCE(location{f}#6094,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) AS distan
+     * ce#6079]]
+     *           \_FieldExtractExec[location{f}#6094]&lt;[],[]&gt;
+     *             \_EsQueryExec[airports], indexMode[standard], query[
+     *             {"bool":{"filter":[
+     *             {"esql_single_value":{"field":"scalerank","next":{"range":
+     *             {"scalerank":{"lt":6,"boost":0.0}}},"source":"scalerank &lt; 6@3:31"}},
+     *             {"bool":{"must":[
+     *             {"geo_shape": {"location":{"relation":"INTERSECTS","shape":
+     *             {"type":"Circle","radius":"499999.99999999994m","coordinates":[12.565,55.673]}}}},
+     *             {"geo_shape":{"location":{"relation":"DISJOINT","shape":
+     *             {"type":"Circle","radius":"10000.000000000002m","coordinates":[12.565,55.673]}}}}
+     *             ],"boost":1.0}}],"boost":1.0}}
+     *             ][_doc{f}#6107], limit[15], sort[
+     *             [FieldSort[field=scalerank{f}#6092, direction=ASC, nulls=LAST],
+     *             GeoDistanceSort[field=location{f}#6094, direction=ASC, lat=55.673, lon=12.565]]] estimatedRowSize[249]
      * </code>
      */
     public void testPushCompoundTopNDistanceWithCompoundFilterToSource() {
@@ -6413,9 +6431,9 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         var exchange = asRemoteExchange(topN.child());
 
         project = as(exchange.child(), ProjectExec.class);
-        assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "scalerank", "distance"));
+        assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "scalerank", "distance"));
         var extract = as(project.child(), FieldExtractExec.class);
-        assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city", "scalerank"));
+        assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name", "scalerank"));
         var evalExec = as(extract.child(), EvalExec.class);
         var alias = as(evalExec.fields().get(0), Alias.class);
         assertThat(alias.name(), is("distance"));
@@ -8053,7 +8071,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
      *              ges{f}#5, last_name{f}#6, long_noidx{f}#12, salary{f}#7],false]
      *      \_ProjectExec[[_meta_field{f}#8, emp_no{f}#2, first_name{f}#3, gender{f}#4, hire_date{f}#9, job{f}#10, job.raw{f}#11, langua
      *              ges{f}#5, last_name{f}#6, long_noidx{f}#12, salary{f}#7]]
-     *        \_FieldExtractExec[_meta_field{f}#8, emp_no{f}#2, first_name{f}#3, gen..]<[],[]>
+     *        \_FieldExtractExec[_meta_field{f}#8, emp_no{f}#2, first_name{f}#3, gen..]&lt;[],[]&gt;
      *          \_EsQueryExec[test], indexMode[standard],
      *                  query[{"bool":{"filter":[{"sampling":{"probability":0.1,"seed":234,"hash":0}}],"boost":1.0}}]
      *                  [_doc{f}#24], limit[1000], sort[] estimatedRowSize[332]