Browse Source

ESQL: Revive some more of inlinestats functionality (#123589)

Andrei Stefan 7 months ago
parent
commit
04c8bf4ba8

+ 5 - 0
docs/changelog/123589.yaml

@@ -0,0 +1,5 @@
+pr: 123589
+summary: Revive some more of inlinestats functionality
+area: ES|QL
+type: bug
+issues: []

+ 2 - 2
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

@@ -48,7 +48,7 @@ import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDI
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
-import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V3;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V4;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V12;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
@@ -126,7 +126,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
         assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
         assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
         assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
-        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V3.capabilityName()));
+        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V4.capabilityName()));
         assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
         // Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
         assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));

+ 115 - 79
x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec

@@ -3,7 +3,7 @@
 //
 
 maxOfInt
-required_capability: inlinestats_v3
+required_capability: inlinestats_v4
 // tag::max-languages[]
 FROM employees
 | KEEP emp_no, languages
@@ -25,7 +25,7 @@ emp_no:integer | languages:integer | max_lang:integer
 ;
 
 maxOfIntByKeyword
-required_capability: inlinestats_v3
+required_capability: inlinestats_v4
 
 FROM employees
 | KEEP emp_no, languages, gender
@@ -34,16 +34,16 @@ FROM employees
 | SORT emp_no ASC
 | LIMIT 5;
 
-emp_no:integer | languages:integer | gender:keyword | max_lang:integer
-         10002 |                 5 | F              | 5
-         10004 |                 5 | M              | 5
-         10011 |                 5 | null           | 5
-         10012 |                 5 | null           | 5
-         10014 |                 5 | null           | 5
+emp_no:integer | languages:integer | max_lang:integer | gender:keyword
+         10002 |                 5 | 5                | F
+         10004 |                 5 | 5                | M
+         10011 |                 5 | 5                | null
+         10012 |                 5 | 5                | null
+         10014 |                 5 | 5                | null
 ;
 
 maxOfLongByKeyword
-required_capability: inlinestats_v3
+required_capability: inlinestats_v4
 
 FROM employees
 | KEEP emp_no, avg_worked_seconds, gender
@@ -51,14 +51,14 @@ FROM employees
 | WHERE max_avg_worked_seconds == avg_worked_seconds
 | SORT emp_no ASC;
 
-emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_seconds:long
-         10007 |               393084805 | F              | 393084805
-         10015 |               390266432 | null           | 390266432
-         10030 |               394597613 | M              | 394597613
+emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | gender:keyword
+         10007 |               393084805 | 393084805                   | F
+         10015 |               390266432 | 390266432                   | null 
+         10030 |               394597613 | 394597613                   | M 
 ;
 
 maxOfLong
-required_capability: inlinestats_v3
+required_capability: inlinestats_v4
 
 FROM employees
 | KEEP emp_no, avg_worked_seconds, gender
@@ -71,7 +71,7 @@ emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_secon
 ;
 
 maxOfLongByCalculatedKeyword
-required_capability: inlinestats_v3
+required_capability: inlinestats_v4
 
 // tag::longest-tenured-by-first[]
 FROM employees
@@ -84,17 +84,17 @@ FROM employees
 ;
 
 // tag::longest-tenured-by-first-result[]
-emp_no:integer | avg_worked_seconds:long | last_name:keyword | SUBSTRING(last_name, 0, 1):keyword | max_avg_worked_seconds:long
-         10065 |               372660279 | Awdeh             | A                                  | 372660279
-         10074 |               382397583 | Bernatsky         | B                                  | 382397583
-         10044 |               387408356 | Casley            | C                                  | 387408356
-         10030 |               394597613 | Demeyer           | D                                  | 394597613
-         10087 |               305782871 | Eugenio           | E                                  | 305782871
+emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_seconds:long | SUBSTRING(last_name, 0, 1):keyword
+         10065 |               372660279 | Awdeh             | 372660279                   | A
+         10074 |               382397583 | Bernatsky         | 382397583                   | B
+         10044 |               387408356 | Casley            | 387408356                   | C
+         10030 |               394597613 | Demeyer           | 394597613                   | D
+         10087 |               305782871 | Eugenio           | 305782871                   | E
 // end::longest-tenured-by-first-result[]
 ;
 
 maxOfLongByCalculatedNamedKeyword
-required_capability: inlinestats_v3
+required_capability: inlinestats_v4
 
 FROM employees
 | KEEP emp_no, avg_worked_seconds, last_name
@@ -104,16 +104,16 @@ FROM employees
 | LIMIT 5
 ;
 
-emp_no:integer | avg_worked_seconds:long | last_name:keyword | l:keyword | max_avg_worked_seconds:long
-         10065 |               372660279 | Awdeh             | A         | 372660279
-         10074 |               382397583 | Bernatsky         | B         | 382397583
-         10044 |               387408356 | Casley            | C         | 387408356
-         10030 |               394597613 | Demeyer           | D         | 394597613
-         10087 |               305782871 | Eugenio           | E         | 305782871
+emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_seconds:long | l:keyword
+         10065 |               372660279 | Awdeh             | 372660279                   | A
+         10074 |               382397583 | Bernatsky         | 382397583                   | B
+         10044 |               387408356 | Casley            | 387408356                   | C
+         10030 |               394597613 | Demeyer           | 394597613                   | D
+         10087 |               305782871 | Eugenio           | 305782871                   | E
 ;
 
-maxOfLongByCalculatedDroppedKeyword-Ignore
-required_capability: join_planning_v1
+maxOfLongByCalculatedDroppedKeyword
+required_capability: inlinestats_v4
 
 FROM employees
 | INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l = SUBSTRING(last_name, 0, 1)
@@ -132,7 +132,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se
 ;
 
 maxOfLongByEvaledKeyword
-required_capability: inlinestats_v3
+required_capability: inlinestats_v4
 
 FROM employees
 | EVAL l = SUBSTRING(last_name, 0, 1)
@@ -143,16 +143,16 @@ FROM employees
 | LIMIT 5
 ;
 
-emp_no:integer | avg_worked_seconds:long | l:keyword | max_avg_worked_seconds:long
-         10065 |               372660279 | A         | 372660279
-         10074 |               382397583 | B         | 382397583
-         10044 |               387408356 | C         | 387408356
-         10030 |               394597613 | D         | 394597613
-         10087 |               305782871 | E         | 305782871
+emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | l:keyword
+         10065 |               372660279 | 372660279                   | A
+         10074 |               382397583 | 382397583                   | B
+         10044 |               387408356 | 387408356                   | C
+         10030 |               394597613 | 394597613                   | D
+         10087 |               305782871 | 305782871                   | E
 ;
 
 maxOfLongByInt
-required_capability: inlinestats_v3
+required_capability: inlinestats_v4
 
 FROM employees
 | KEEP emp_no, avg_worked_seconds, languages
@@ -160,17 +160,17 @@ FROM employees
 | WHERE max_avg_worked_seconds == avg_worked_seconds
 | SORT languages ASC;
 
-emp_no:integer | avg_worked_seconds:long | languages:integer | max_avg_worked_seconds:long
-         10044 |               387408356 |                 1 | 387408356
-         10099 |               377713748 |                 2 | 377713748
-         10030 |               394597613 |                 3 | 394597613
-         10007 |               393084805 |                 4 | 393084805
-         10015 |               390266432 |                 5 | 390266432
-         10027 |               374037782 |              null | 374037782
+emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | languages:integer
+         10044 |               387408356 | 387408356                   | 1
+         10099 |               377713748 | 377713748                   | 2
+         10030 |               394597613 | 394597613                   | 3
+         10007 |               393084805 | 393084805                   | 4
+         10015 |               390266432 | 390266432                   | 5
+         10027 |               374037782 | 374037782                   | null
 ;
 
 maxOfLongByIntDouble
-required_capability: inlinestats_v3
+required_capability: inlinestats_v4
 
 FROM employees
 | KEEP emp_no, avg_worked_seconds, languages, height
@@ -180,11 +180,11 @@ FROM employees
 | SORT languages, height ASC
 | LIMIT 4;
 
-emp_no:integer | avg_worked_seconds:long | languages:integer | height:double | max_avg_worked_seconds:long
-         10083 |               331236443 |                 1 |           1.4 | 331236443
-         10084 |               359067056 |                 1 |           1.5 | 359067056
-         10033 |               208374744 |                 1 |           1.6 | 208374744
-         10086 |               328580163 |                 1 |           1.7 | 328580163
+emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | languages:integer | height:double
+         10083 |               331236443 | 331236443                   |                 1 |           1.4
+         10084 |               359067056 | 359067056                   |                 1 |           1.5
+         10033 |               208374744 | 208374744                   |                 1 |           1.6
+         10086 |               328580163 | 328580163                   |                 1 |           1.7
 ;
 
 
@@ -243,8 +243,8 @@ abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer
 // end::mv-expand-result[]
 ;
 
-byMvExpand-Ignore
-required_capability: join_planning_v1
+byMvExpand
+required_capability: inlinestats_v4
 
 // tag::extreme-airports[]
 FROM airports
@@ -307,8 +307,8 @@ count:long | country:keyword | avg:double
         17 |  United Kingdom | 4.455
 ;
 
-afterWhere-Ignore
-required_capability: join_planning_v1
+afterWhere
+required_capability: inlinestats_v4
 
 FROM airports
 | WHERE country != "United States"
@@ -367,7 +367,7 @@ abbrev:keyword | city:keyword |       region:text | "COUNT(*)":long
 ;
 
 beforeStats
-required_capability: inlinestats_v3
+required_capability: inlinestats_v4
 
 FROM airports
 | EVAL lat = ST_Y(location)
@@ -379,8 +379,8 @@ northern:long | southern:long
           520 | 371
 ;
 
-beforeKeepSort-Ignore
-required_capability: join_planning_v1
+beforeKeepSort
+required_capability: inlinestats_v4
 
 FROM employees
 | INLINESTATS max_salary = MAX(salary) by languages
@@ -500,8 +500,8 @@ city:keyword | x:keyword
 Zürich       | Zürich
 ;
 
-byConstant-Ignore
-required_capability: join_planning_v1
+byConstant
+required_capability: inlinestats_v4
 
 FROM employees
 | KEEP emp_no, languages
@@ -520,7 +520,7 @@ emp_no:integer | languages:integer | max_lang:integer | y:integer
 ;
 
 aggConstant
-required_capability: join_planning_v1
+required_capability: inlinestats_v4
 
 FROM employees
 | KEEP emp_no
@@ -529,16 +529,16 @@ FROM employees
 | LIMIT 5
 ;
 
-emp_no:integer | one:integer
-         10001 | 1
-         10002 | 1
-         10003 | 1
-         10004 | 1
-         10005 | 1
+one:integer | emp_no:integer
+          1 | 10001
+          1 | 10002
+          1 | 10003
+          1 | 10004
+          1 | 10005
 ;
 
 percentile
-required_capability: inlinestats_v3
+required_capability: inlinestats_v4
 
 FROM employees
 | KEEP emp_no, salary
@@ -557,7 +557,7 @@ emp_no:integer | salary:integer | ninety_fifth_salary:double
 ;
 
 byTwoCalculated
-required_capability: join_planning_v1
+required_capability: inlinestats_v4
 
 FROM airports
 | WHERE abbrev IS NOT NULL
@@ -569,10 +569,10 @@ FROM airports
 | LIMIT 3
 ;
 
-abbrev:keyword | scalerank:integer |             location:geo_point             | lat_10:double | lon_10:double | min_sl:integer
-           ZRH |                 3 | POINT(8.56221279534765 47.4523895064915)   |            50 |            10 | 2
-           ZNZ |                 4 | POINT (39.2223319841558 -6.21857034620282) |           -10 |            40 | 4
-           ZLO |                 7 | POINT (-104.560095200097 19.1480860285854) |            20 |          -100 | 2
+abbrev:keyword | scalerank:integer |             location:geo_point             | min_sl:integer | lat_10:double | lon_10:double
+           ZRH |                 3 | POINT(8.56221279534765 47.4523895064915)   | 2              |            50 |            10
+           ZNZ |                 4 | POINT (39.2223319841558 -6.21857034620282) | 4              |           -10 |            40
+           ZLO |                 7 | POINT (-104.560095200097 19.1480860285854) | 2              |            20 |          -100
 ;
 
 byTwoCalculatedSecondOverwrites-Ignore
@@ -642,7 +642,7 @@ abbrev:keyword | scalerank:integer |             location:geo_point
 ;
 
 groupShadowsField
-required_capability: join_planning_v1
+required_capability: inlinestats_v4
 
   FROM employees
 | KEEP emp_no, salary, hire_date
@@ -653,9 +653,45 @@ required_capability: join_planning_v1
 | LIMIT 4
 ;
 
-emp_no:integer | salary:integer |  hire_date:datetime  | avg_salary:double
-         10001 |          57305 | 1986-01-01T00:00:00Z | 43869.63636363636
-         10002 |          56371 | 1985-01-01T00:00:00Z | 51831.818181818184
-         10003 |          61805 | 1986-01-01T00:00:00Z | 43869.63636363636
-         10005 |          63528 | 1989-01-01T00:00:00Z | 53487.07692307692
+emp_no:integer | salary:integer | avg_salary:double |  hire_date:datetime
+         10001 |          57305 | 43869.63636363636 | 1986-01-01T00:00:00Z
+         10002 |          56371 | 51831.818181818184| 1985-01-01T00:00:00Z
+         10003 |          61805 | 43869.63636363636 | 1986-01-01T00:00:00Z
+         10005 |          63528 | 53487.07692307692 | 1989-01-01T00:00:00Z
+;
+
+groupByExpression_And_ExistentField
+required_capability: inlinestats_v4
+FROM employees
+| KEEP emp_no, languages, gender
+| EVAL x = "ABC"
+| INLINESTATS max_lang = MAX(languages) BY y = to_lower(x), gender
+| SORT emp_no ASC
+| LIMIT 5
+;
+
+emp_no:integer | languages:integer | x:keyword | max_lang:integer | y:keyword | gender:keyword
+10001          |2                  |ABC        |5                 |abc        |M              
+10002          |5                  |ABC        |5                 |abc        |F              
+10003          |4                  |ABC        |5                 |abc        |M              
+10004          |5                  |ABC        |5                 |abc        |M              
+10005          |1                  |ABC        |5                 |abc        |M
+;
+
+groupByRenamedColumn-Ignore
+required_capability: inlinestats_v4
+FROM employees
+| KEEP emp_no, languages, gender
+| INLINESTATS max_lang = MAX(languages) BY y = gender
+| WHERE max_lang == languages
+| SORT emp_no ASC
+| LIMIT 5
+;
+
+emp_no:integer | languages:integer | gender:keyword | max_lang:integer | y:keyword
+         10002 |                 5 | F              | 5                | F
+         10004 |                 5 | M              | 5                | M
+         10011 |                 5 | null           | 5                | null
+         10012 |                 5 | null           | 5                | null
+         10014 |                 5 | null           | 5                | null
 ;

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -835,7 +835,7 @@ public class EsqlCapabilities {
          * Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
          * were refactored.
          */
-        INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
+        INLINESTATS_V4(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
 
         /**
          * Support partial_results

+ 29 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java

@@ -13,6 +13,7 @@ import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockUtils;
 import org.elasticsearch.xpack.esql.core.expression.Alias;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
 import org.elasticsearch.xpack.esql.core.expression.Literal;
 import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -26,6 +27,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
+import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
+
 /**
  * Specialized type of join where the source of the left and right plans are the same. The plans themselves can contain different nodes
  * however at the core, both have the same source.
@@ -129,4 +133,29 @@ public class InlineJoin extends Join {
     public Join replaceChildren(LogicalPlan left, LogicalPlan right) {
         return new InlineJoin(source(), left, right, config());
     }
+
+    @Override
+    public List<Attribute> computeOutput(List<Attribute> left, List<Attribute> right) {
+        JoinType joinType = config().type();
+        List<Attribute> output;
+        if (LEFT.equals(joinType)) {
+            AttributeSet rightFields = new AttributeSet(config().rightFields());
+            List<Attribute> leftOutputWithoutMatchFields = new ArrayList<>();
+            // at this point "left" part of the join contains all the attributes that represent the input of the join
+            // including any aliasing (evals) of expressions used as grouping attributes (or join "match fields") in the join itself
+            for (Attribute attr : left().output()) {
+                if (rightFields.contains(attr) == false) {
+                    // the aforementioned groupings expressions or aliasing are removed from the left set of attributes
+                    leftOutputWithoutMatchFields.add(attr);
+                }
+            }
+            // the actual output of the join will place the left hand side attributes (excluding any aliasing of the groupings)
+            // as first columns in the output followed by whatever the right hand side of join adds in this order: aggregates first,
+            // followed by groupings (this order should be preserved inside the rightFields() output)
+            output = mergeOutputAttributes(right, leftOutputWithoutMatchFields);
+        } else {
+            throw new IllegalArgumentException(joinType.joinName() + " unsupported");
+        }
+        return output;
+    }
 }

+ 5 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java

@@ -98,7 +98,7 @@ public class Join extends BinaryPlan implements PostAnalysisVerificationAware, S
     @Override
     public List<Attribute> output() {
         if (lazyOutput == null) {
-            lazyOutput = computeOutput(left().output(), right().output(), config);
+            lazyOutput = computeOutput(left().output(), right().output());
         }
         return lazyOutput;
     }
@@ -126,6 +126,10 @@ public class Join extends BinaryPlan implements PostAnalysisVerificationAware, S
         return rightOutputFields;
     }
 
+    public List<Attribute> computeOutput(List<Attribute> left, List<Attribute> right) {
+        return computeOutput(left, right, config);
+    }
+
     /**
      * Combine the two lists of attributes into one.
      * In case of (name) conflicts, specify which sides wins, that is overrides the other column - the left or the right.

+ 21 - 13
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java

@@ -18,6 +18,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -32,7 +33,8 @@ public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
     private final List<Attribute> matchFields;
     private final List<Attribute> leftFields;
     private final List<Attribute> rightFields;
-    private final List<Attribute> output;
+    private final List<Attribute> addedFields;
+    private List<Attribute> lazyOutput;
     private AttributeSet lazyAddedFields;
 
     public HashJoinExec(
@@ -42,13 +44,13 @@ public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
         List<Attribute> matchFields,
         List<Attribute> leftFields,
         List<Attribute> rightFields,
-        List<Attribute> output
+        List<Attribute> addedFields
     ) {
         super(source, left, hashData);
         this.matchFields = matchFields;
         this.leftFields = leftFields;
         this.rightFields = rightFields;
-        this.output = output;
+        this.addedFields = addedFields;
     }
 
     private HashJoinExec(StreamInput in) throws IOException {
@@ -56,7 +58,7 @@ public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
         this.matchFields = in.readNamedWriteableCollectionAsList(Attribute.class);
         this.leftFields = in.readNamedWriteableCollectionAsList(Attribute.class);
         this.rightFields = in.readNamedWriteableCollectionAsList(Attribute.class);
-        this.output = in.readNamedWriteableCollectionAsList(Attribute.class);
+        this.addedFields = in.readNamedWriteableCollectionAsList(Attribute.class);
     }
 
     @Override
@@ -65,7 +67,7 @@ public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
         out.writeNamedWriteableCollection(matchFields);
         out.writeNamedWriteableCollection(leftFields);
         out.writeNamedWriteableCollection(rightFields);
-        out.writeNamedWriteableCollection(output);
+        out.writeNamedWriteableCollection(addedFields);
     }
 
     @Override
@@ -91,21 +93,27 @@ public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
 
     public Set<Attribute> addedFields() {
         if (lazyAddedFields == null) {
-            lazyAddedFields = new AttributeSet(output());
-            lazyAddedFields.removeAll(left().output());
+            lazyAddedFields = new AttributeSet(addedFields);
         }
         return lazyAddedFields;
     }
 
     @Override
     public PhysicalPlan estimateRowSize(State state) {
-        state.add(false, output);
+        state.add(false, addedFields);
         return this;
     }
 
     @Override
     public List<Attribute> output() {
-        return output;
+        if (lazyOutput == null) {
+            lazyOutput = new ArrayList<>(left().output());
+            var rightFieldNames = rightFields.stream().map(Attribute::name).toList();
+            lazyOutput.removeIf(a -> rightFieldNames.contains(a.name()));
+            lazyOutput.addAll(addedFields);
+            lazyOutput.addAll(rightFields);
+        }
+        return lazyOutput;
     }
 
     @Override
@@ -131,12 +139,12 @@ public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
 
     @Override
     public HashJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
-        return new HashJoinExec(source(), left, right, matchFields, leftFields, rightFields, output);
+        return new HashJoinExec(source(), left, right, matchFields, leftFields, rightFields, addedFields);
     }
 
     @Override
     protected NodeInfo<? extends PhysicalPlan> info() {
-        return NodeInfo.create(this, HashJoinExec::new, left(), right(), matchFields, leftFields, rightFields, output);
+        return NodeInfo.create(this, HashJoinExec::new, left(), right(), matchFields, leftFields, rightFields, addedFields);
     }
 
     @Override
@@ -154,11 +162,11 @@ public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
         return matchFields.equals(hash.matchFields)
             && leftFields.equals(hash.leftFields)
             && rightFields.equals(hash.rightFields)
-            && output.equals(hash.output);
+            && addedFields.equals(hash.addedFields);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), matchFields, leftFields, rightFields, output);
+        return Objects.hash(super.hashCode(), matchFields, leftFields, rightFields, addedFields);
     }
 }

+ 3 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

@@ -554,10 +554,11 @@ public class LocalExecutionPlanner {
         source = source.with(new RowInTableLookupOperator.Factory(keys, blockMapping), layout);
 
         // Load the "values" from each match
+        var joinDataOutput = join.joinData().output();
         for (Attribute f : join.addedFields()) {
             Block localField = null;
-            for (int l = 0; l < join.joinData().output().size(); l++) {
-                if (join.joinData().output().get(l).name().equals(f.name())) {
+            for (int l = 0; l < joinDataOutput.size(); l++) {
+                if (joinDataOutput.get(l).name().equals(f.name())) {
                     localField = localData[l];
                 }
             }

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

@@ -110,7 +110,7 @@ public class LocalMapper {
                     config.matchFields(),
                     config.leftFields(),
                     config.rightFields(),
-                    join.output()
+                    join.rightOutputFields()
                 );
             }
             if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) {