1
0
Эх сурвалжийг харах

ESQL: Allow pruning columns added by InlineJoin (#131204)

This updates `PruneColumns` rule to allow optimising away the columns added by `InlineJoin`, dropping unnecessary agg'ing.
Bogdan Pintea 2 сар өмнө
parent
commit
d3a07f7360

+ 5 - 0
docs/changelog/131204.yaml

@@ -0,0 +1,5 @@
+pr: 131204
+summary: Allow pruning columns added by `InlineJoin`
+area: ES|QL
+type: enhancement
+issues: []

+ 2 - 4
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

@@ -53,8 +53,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
-import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
-import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V8;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V9;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V12;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
@@ -150,9 +149,8 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
         Version oldVersion = Version.min(Clusters.localClusterVersion(), Clusters.remoteClusterVersion());
         assumeTrue("Test " + testName + " is skipped on " + oldVersion, isEnabled(testName, instructions, oldVersion));
         assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
-        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
         assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
-        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V8.capabilityName()));
+        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V9.capabilityName()));
         if (testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())) {
             assumeTrue("LOOKUP JOIN not yet supported in CCS", hasCapabilities(List.of(ENABLE_LOOKUP_JOIN_ON_REMOTE.capabilityName())));
         }

+ 179 - 73
x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec

@@ -3,7 +3,7 @@
 //
 
 allFieldsReturned
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM hosts METADATA _index
 | INLINESTATS c = COUNT(*) BY host_group
@@ -16,7 +16,7 @@ eth0           |epsilon gw instance|epsilon        |[fe80::cae2:65ff:fece:feb9,
 ;
 
 maxOfInt
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 // tag::max-languages[]
 FROM employees
 | KEEP emp_no, languages
@@ -38,7 +38,7 @@ emp_no:integer | languages:integer | max_lang:integer
 ;
 
 maxOfIntByKeyword
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, gender
@@ -56,7 +56,7 @@ emp_no:integer | languages:integer | max_lang:integer | gender:keyword
 ;
 
 maxOfLongByKeyword
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, avg_worked_seconds, gender
@@ -71,7 +71,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | gender:
 ;
 
 maxOfLong
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, avg_worked_seconds, gender
@@ -84,7 +84,7 @@ emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_secon
 ;
 
 maxOfLongByCalculatedKeyword
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 // tag::longest-tenured-by-first[]
 FROM employees
@@ -107,7 +107,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se
 ;
 
 maxOfLongByCalculatedNamedKeyword
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, avg_worked_seconds, last_name
@@ -126,7 +126,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se
 ;
 
 maxOfLongByCalculatedDroppedKeyword
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l = SUBSTRING(last_name, 0, 1)
@@ -145,7 +145,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se
 ;
 
 maxOfLongByEvaledKeyword
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | EVAL l = SUBSTRING(last_name, 0, 1)
@@ -165,7 +165,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | l:keywo
 ;
 
 maxOfLongByInt
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, avg_worked_seconds, languages
@@ -183,7 +183,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | languag
 ;
 
 maxOfLongByIntDouble
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, avg_worked_seconds, languages, height
@@ -201,7 +201,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | languag
 ;
 
 two
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, avg_worked_seconds, gender
@@ -225,7 +225,7 @@ emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:
 ;
 
 three
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 // used to fail with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
 
 FROM employees
@@ -253,7 +253,7 @@ emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:
 
 // TODO: INLINESTATS unit test needed for this one
 pushDownSort_To_LeftSideOnly
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 from employees
 | sort emp_no
@@ -271,7 +271,7 @@ from employees
 ;
 
 byMultivaluedSimple
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 // tag::mv-group[]
 FROM airports
@@ -289,7 +289,7 @@ abbrev:keyword |  type:keyword   | scalerank:integer | min_scalerank:integer
 ;
 
 byMultivaluedMvExpand
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 // tag::mv-expand[]
 FROM airports
@@ -309,7 +309,7 @@ GWL            |9                  |4                      |military
 ;
 
 byMvExpand
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 // tag::extreme-airports[]
 FROM airports
@@ -338,7 +338,7 @@ FROM airports
 ;
 
 mvMinMvExpand
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM airports
 | EVAL original_type = type
@@ -361,7 +361,7 @@ ZAR            |Zaria          |POINT (7.7 11.0667)     |Nigeria        |POINT (
 ;
 
 afterStats
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM airports
 | STATS count=COUNT(*) BY country
@@ -384,7 +384,7 @@ count:long | country:keyword | avg:double
 ;
 
 afterWhere
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM airports
 | WHERE country != "United States"
@@ -402,7 +402,7 @@ abbrev:keyword | country:keyword | count:long
 ;
 
 afterLookup
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 required_capability: join_lookup_v12
 
 FROM airports
@@ -426,7 +426,7 @@ ZNZ            |4                       |German
 ;
 
 afterEnrich
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 required_capability: enrich_load
 
 FROM airports
@@ -447,7 +447,7 @@ abbrev:keyword | city:keyword | "COUNT(*)":long |       region:text
 ;
 
 beforeStats
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM airports
 | EVAL lat = ST_Y(location)
@@ -460,7 +460,7 @@ northern:long | southern:long
 ;
 
 beforeKeepSort
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | INLINESTATS max_salary = MAX(salary) by languages
@@ -475,7 +475,7 @@ emp_no:integer | languages:integer | max_salary:integer
 ;
 
 beforeKeepWhere
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | INLINESTATS max_salary = MAX(salary) by languages
@@ -488,7 +488,7 @@ emp_no:integer | languages:integer | max_salary:integer
 ;
 
 beforeEnrich
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 required_capability: enrich_load
 
 FROM airports
@@ -507,7 +507,7 @@ ACA            |Acapulco de Juárez|385            |major          |Acapulco de
 ;
 
 beforeAndAfterEnrich
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 required_capability: enrich_load
 
 FROM airports
@@ -530,7 +530,7 @@ ALL            |Albenga        |499            |mid            |1
 ;
 
 shadowing
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
 | INLINESTATS env = VALUES(right) BY client_ip
@@ -541,7 +541,7 @@ left         | right          | right       | 172.21.0.5
 ;
 
 shadowingMulti
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 ROW left = "left", airport = "Zurich Airport ZRH", city = "Zürich", middle = "middle", region = "North-East Switzerland", right = "right"
 | INLINESTATS airport=VALUES(left), region=VALUES(left), city_boundary=VALUES(left) BY city
@@ -552,7 +552,7 @@ left         | middle         | right         |            left |           left
 ;
 
 shadowingSelf
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 ROW city = "Raleigh"
 | INLINESTATS city = COUNT(city)
@@ -563,7 +563,7 @@ city:long
 ;
 
 shadowingSelfBySelf
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 ROW city = "Raleigh"
 | INLINESTATS city = COUNT(city) BY city
@@ -575,7 +575,7 @@ Raleigh
 ;
 
 shadowingInternal
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 ROW city = "Zürich"
 | INLINESTATS x = VALUES(city), x = VALUES(city)
@@ -587,7 +587,7 @@ Zürich       | Zürich
 ;
 
 multiInlinestatsWithRow
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 row x = 1
 | inlinestats x = max(x) + min(x)
@@ -601,7 +601,7 @@ row x = 1
 ;
 
 ignoreUnusedEvaledValue_AndInlineStats
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 ROW x = 1
 | INLINESTATS max(x)
@@ -614,7 +614,7 @@ x:integer
 ;
 
 ignoreUnusedEvaledValue_AndInlineStats2
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 ROW x = 1, z = 2
 | INLINESTATS max(x)
@@ -627,7 +627,7 @@ x:integer | z:integer
 ;
 
 ignoreUnusedEvaledValue_AndInlineStats3
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 from employees
 | inlinestats max(salary)
@@ -642,7 +642,7 @@ from employees
 ;
 
 ignoreUnusedEvaledValue_AndInlineStats4
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 from employees
 | inlinestats max(salary), m = min(salary) by gender
@@ -657,7 +657,7 @@ emp_no:integer
 ;
 
 ignoreUnusedEvaledValue_AndInlineStats5
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 from employees
 | inlinestats max(salary), m = min(salary) by gender
@@ -671,8 +671,23 @@ emp_no:integer
 10100     
 ;
 
+shadowEntireInlinestats
+required_capability: inlinestats_v9
+
+FROM employees
+| INLINESTATS x = avg(salary), y = min(salary) BY emp_no
+| EVAL x = emp_no, y = x
+| SORT x
+| KEEP x, y, emp_no
+| LIMIT 1
+;
+
+x:integer |y:integer |emp_no:integer
+10001     |10001     |10001
+;
+
 byConstant
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages
@@ -691,7 +706,7 @@ emp_no:integer | languages:integer | max_lang:integer | y:integer
 ;
 
 aggConstant
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no
@@ -709,7 +724,7 @@ one:integer | emp_no:integer
 ;
 
 percentile
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, salary
@@ -728,7 +743,7 @@ emp_no:integer | salary:integer | ninety_fifth_salary:double
 ;
 
 byTwoCalculated
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM airports
 | WHERE abbrev IS NOT NULL
@@ -748,7 +763,7 @@ abbrev:keyword | scalerank:integer |             location:geo_point
 
 byTwoCalculatedSecondOverwrites
 required_capability: stats_alias_collision_warnings
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM airports
 | WHERE abbrev IS NOT NULL
@@ -769,7 +784,7 @@ abbrev:keyword | scalerank:integer |             location:geo_point
 
 byTwoCalculatedSecondOverwritesReferencingFirst
 required_capability: stats_alias_collision_warnings
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM airports
 | WHERE abbrev IS NOT NULL
@@ -792,7 +807,7 @@ abbrev:keyword | scalerank:integer |             location:geo_point
 
 groupShadowsAgg
 required_capability: stats_alias_collision_warnings
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM airports
 | WHERE abbrev IS NOT NULL
@@ -812,7 +827,7 @@ abbrev:keyword | scalerank:integer |             location:geo_point
 ;
 
 groupShadowsField
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
   FROM employees
 | KEEP emp_no, salary, hire_date
@@ -831,7 +846,7 @@ emp_no:integer | salary:integer | avg_salary:double |  hire_date:datetime
 ;
 
 groupByExpression_And_ExistentField
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 FROM employees
 | KEEP emp_no, languages, gender
 | EVAL x = "ABC"
@@ -849,7 +864,7 @@ emp_no:integer | languages:integer | x:keyword | max_lang:integer | y:keyword |
 ;
 
 groupByRenamedColumn
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 FROM employees
 | KEEP emp_no, languages, gender
 | INLINESTATS max_lang = MAX(languages) BY y = gender
@@ -868,7 +883,7 @@ emp_no:integer | languages:integer | gender:keyword | max_lang:integer | y:keywo
 
 // fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
 groupByMultipleRenamedColumns_AndOneExpression_Last
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, gender, first_name
@@ -892,7 +907,7 @@ emp_no:integer | languages:integer | gender:keyword|first_name:keyword|max_lang:
 
 // fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
 groupByMultipleRenamedColumns_AndTwoExpressions
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, gender, first_name
@@ -916,7 +931,7 @@ emp_no:integer | languages:integer | gender:keyword|first_name:keyword|max_lang:
 
 // fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
 groupByMultipleRenamedColumns_AndMultipleRenames
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, gender, first_name
@@ -941,7 +956,7 @@ emp_no:integer | languages:integer | gender:keyword|    f:keyword     |max_lang:
 
 // fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
 groupByMultipleRenamedColumns_AndSameNameExpressionGroupingOverride
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, gender, first_name
@@ -965,7 +980,7 @@ emp_no:integer | languages:integer | gender:keyword|max_lang:integer|  y:keyword
 ;
 
 twoAggregatesGroupedBy_AField_And_AnExpression
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, gender, last_name
@@ -987,7 +1002,7 @@ emp_no:integer |languages:integer|last_name:keyword|max_lang:integer|min_lang:in
 ;
 
 groupByMultipleRenamedColumns_InversedOrder
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, still_hired, gender
@@ -1005,7 +1020,7 @@ emp_no:integer |languages:integer|still_hired:boolean| gender:keyword|max_lang:i
 ;
 
 groupByMultipleRenamedColumns_InversedOrder_ComplexEval
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, still_hired, gender
@@ -1024,7 +1039,7 @@ emp_no:integer |languages:integer|still_hired:boolean| gender:keyword|multilingu
 ;
 
 groupByMultipleRenamedColumns_AndComplexEval
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, still_hired, gender
@@ -1044,7 +1059,7 @@ emp_no:integer |languages:integer|still_hired:boolean| gender:keyword|multilingu
 
 // fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
 groupByMultipleRenamedColumns_AndConstantValue
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, gender, first_name
@@ -1068,7 +1083,7 @@ emp_no:integer |languages:integer|gender:keyword |first_name:keyword  |   x:keyw
 ;
 
 groupByRenamedExpression
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP emp_no, languages, gender, last_name
@@ -1090,7 +1105,7 @@ emp_no:integer |languages:integer|last_name:keyword|max_lang:integer|min_lang:in
 ;
 
 doubleFilterOnLeftAndRight_InlineStats_Sides
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
   
 FROM employees
 | INLINESTATS max_salary = MAX(salary), min_salary = MIN(salary) by languages
@@ -1111,7 +1126,7 @@ emp_no:integer |languages:integer|salary:integer |max_salary:integer|min_salary:
 ;
 
 filterOnInlineStatsAggs
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | INLINESTATS max_salary = MAX(salary), min_salary = MIN(salary) by languages
@@ -1130,7 +1145,7 @@ emp_no:integer |languages:integer|salary:integer |max_salary:integer|min_salary:
 ;
 
 filterOnInlineStatsAggsValues_And_Groupings
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | INLINESTATS max_salary = MAX(salary), min_salary = MIN(salary) by languages
@@ -1149,7 +1164,7 @@ emp_no:integer |languages:integer|salary:integer |max_salary:integer|min_salary:
 ;
 
 inlineStatsOverrideEVALed_FieldWithSameName
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM hosts METADATA _index
 | EVAL x = ip1
@@ -1163,7 +1178,7 @@ beta k8s server |beta           |127.0.0.1      |hosts           |127.0.0.2|2
 ;
 
 doubleShadowing
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | INLINESTATS salary = min(salary) BY gender
@@ -1182,7 +1197,7 @@ salary:integer |gender:keyword
 ;
 
 doubleShadowing_WithIntertwinedFilters
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | WHERE salary > 30000
@@ -1207,7 +1222,7 @@ salary:integer |gender:keyword
 ;
 
 shadowingAggregateByNextGrouping
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP gender, languages, emp_no, salary
@@ -1224,7 +1239,7 @@ emp_no:integer |salary:integer |languages:integer|avg(salary):double|gender:long
 ;
 
 doubleShadowingWithEval
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 from employees
 | eval salary = salary/100
@@ -1244,7 +1259,7 @@ salary:integer|gender:keyword
 ;
 
 doubleShadowingWithDoubleStats
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 from employees
 | stats salary=min(salary) by gender
@@ -1261,7 +1276,7 @@ M              |25324
 ;
 
 renamingGroupingWithItself
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | EVAL x = gender
@@ -1280,7 +1295,7 @@ salary:integer |x:keyword|gender:keyword |min_sl:integer |emp_no:integer
 ;
 
 overridingGroupings
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | INLINESTATS min_sl = MIN(salary) BY x = gender, x = languages
@@ -1299,7 +1314,7 @@ salary:integer |x:integer      |gender:keyword |min_sl:integer |emp_no:integer
 ;
 
 overridingExpressionGroupings
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | INLINESTATS min_sl = MIN(salary) BY x = TO_LOWER(gender), x = CONCAT(gender, gender)
@@ -1318,7 +1333,7 @@ salary:integer |x:keyword      |gender:keyword |min_sl:integer |emp_no:integer
 ;
 
 reusingEvalExpressions_UsedInGroupings
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM employees
 | KEEP salary, gender, emp_no
@@ -1337,7 +1352,7 @@ salary:integer |gender:keyword |emp_no:integer |min_sl:integer |       x:keyword
 ;
 
 statsBeforeInlinestatsWithTopAndBucket1
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM books
 | STATS avg_rating = AVG(ratings) BY decade = BUCKET(year, 10)
@@ -1357,7 +1372,7 @@ avg_rating:double | decade:double |    decades:double
 ;
 
 statsBeforeInlinestatsWithTopAndBucket2
-required_capability: inlinestats_v8
+required_capability: inlinestats_v9
 
 FROM sample_data
 | STATS total_duration = SUM(event_duration) BY day = BUCKET(@timestamp, 1 HOUR)
@@ -1371,3 +1386,94 @@ total_duration:long |       day:date         |                     days:date
 17016205            |2023-10-23T13:00:00.000Z|[2023-10-23T12:00:00.000Z, 2023-10-23T13:00:00.000Z]
 ;
 
+
+evalBeforeInlinestatsAndKeepAfter1
+required_capability: inlinestats_v9
+
+FROM employees
+| WHERE still_hired == false
+| EVAL sal = salary/1000
+| INLINESTATS totalK = SUM(sal), count=COUNT(*) BY gender
+| KEEP emp_no, still_hired, totalK, count
+| SORT emp_no
+| LIMIT 5
+;
+
+emp_no:integer |still_hired:boolean|totalK:long|count:long
+10003          |false              |1567       |32
+10006          |false              |810        |16
+10009          |false              |810        |16
+10010          |false              |378        |7
+10012          |false              |378        |7
+;
+
+evalBeforeInlinestatsAndKeepAfter2
+required_capability: inlinestats_v9
+
+FROM employees
+| EVAL salaryK = salary/1000
+| INLINESTATS total = SUM(salaryK), count=COUNT(*) BY gender
+| KEEP emp_no, still_hired, total, count
+| WHERE still_hired == false
+| SORT emp_no
+| LIMIT 5
+;
+
+emp_no:integer |still_hired:boolean|total:long|count:long
+10003          |false              |2644      |57
+10006          |false              |1648      |33
+10009          |false              |1648      |33
+10010          |false              |482       |10
+10012          |false              |482       |10
+;
+
+evalBeforeInlinestatsAndKeepAfter3
+required_capability: inlinestats_v9
+
+FROM employees
+| EVAL salaryK = salary/1000
+| INLINESTATS total = SUM(salaryK) BY gender
+| KEEP emp_no, still_hired, total
+| SORT emp_no
+| LIMIT 5
+;
+
+emp_no:integer |still_hired:boolean|total:long
+10001          |true               |2644
+10002          |true               |1648
+10003          |false              |2644
+10004          |true               |2644
+10005          |true               |2644
+;
+
+evalBeforeInlinestatsAndKeepAfter4
+required_capability: inlinestats_v9
+
+FROM employees
+| EVAL salaryK = salary/1000
+| INLINESTATS count = COUNT(*) BY salaryK
+| KEEP emp_no, still_hired, count
+| SORT emp_no
+| LIMIT 5
+;
+
+emp_no:integer |still_hired:boolean|count:long
+10001          |true               |1
+10002          |true               |3
+10003          |false              |2
+10004          |true               |2
+10005          |true               |1
+;
+
+evalBeforeInlinestatsAndKeepAfter5
+required_capability: inlinestats_v9
+
+ROW salary = 12300, emp_no = 5, gender = "F"
+| EVAL salaryK = salary/1000
+| INLINESTATS sum = SUM(salaryK) BY gender
+| KEEP emp_no
+;
+
+emp_no:integer
+5
+;

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -952,7 +952,7 @@ public class EsqlCapabilities {
          * Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
          * were refactored.
          */
-        INLINESTATS_V8(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
+        INLINESTATS_V9(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
 
         /**
          * Support partial_results

+ 157 - 86
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java

@@ -14,6 +14,7 @@ import org.elasticsearch.xpack.esql.core.expression.Alias;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
 import org.elasticsearch.xpack.esql.core.expression.Expressions;
+import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.util.Holder;
 import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
@@ -22,7 +23,11 @@ import org.elasticsearch.xpack.esql.plan.logical.Eval;
 import org.elasticsearch.xpack.esql.plan.logical.Fork;
 import org.elasticsearch.xpack.esql.plan.logical.Limit;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.Project;
+import org.elasticsearch.xpack.esql.plan.logical.Sample;
 import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
+import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
+import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
 import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -38,15 +43,14 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
 
     @Override
     public LogicalPlan apply(LogicalPlan plan) {
-        // track used references
-        var used = plan.outputSet().asBuilder();
-        // track inlinestats' own aggregation output (right-hand side of the join) so that any other plan on the left-hand side of the
-        // inline join won't have its columns pruned due to the lack of "visibility" into the right hand side output/Attributes
-        var inlineJoinRightOutput = new ArrayList<Attribute>();
+        return pruneColumns(plan, plan.outputSet().asBuilder(), false);
+    }
+
+    private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder used, boolean inlineJoin) {
         Holder<Boolean> forkPresent = new Holder<>(false);
 
         // while going top-to-bottom (upstream)
-        var pl = plan.transformDown(p -> {
+        return plan.transformDown(p -> {
             // Note: It is NOT required to do anything special for binary plans like JOINs, except INLINESTATS. It is perfectly fine that
             // transformDown descends first into the left side, adding all kinds of attributes to the `used` set, and then descends into
             // the right side - even though the `used` set will contain stuff only used in the left hand side. That's because any attribute
@@ -54,8 +58,9 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
             // same index fields will have different name ids in the left and right hand sides - as in the extreme example
             // `FROM lookup_idx | LOOKUP JOIN lookup_idx ON key_field`.
 
-            // skip nodes that simply pass the input through
-            if (p instanceof Limit) {
+            // TODO: revisit with every new command
+            // skip nodes that simply pass the input through and use no references
+            if (p instanceof Limit || p instanceof Sample) {
                 return p;
             }
 
@@ -67,105 +72,171 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
                 return p;
             }
 
-            // TODO: INLINESTATS unit testing for tracking this set
-            if (p instanceof InlineJoin ij) {
-                inlineJoinRightOutput.addAll(ij.right().outputSet());
-            }
-
-            // remember used
-            boolean recheck;
+            var recheck = new Holder<Boolean>();
             // analyze the unused items against dedicated 'producer' nodes such as Eval and Aggregate
             // perform a loop to retry checking if the current node is completely eliminated
             do {
-                recheck = false;
-                if (p instanceof Aggregate aggregate) {
-                    // TODO: INLINESTATS https://github.com/elastic/elasticsearch/pull/128917#discussion_r2175162099
-                    var remaining = removeUnused(aggregate.aggregates(), used, inlineJoinRightOutput);
-
-                    if (remaining != null) {
-                        if (remaining.isEmpty()) {
-                            // We still need to have a plan that produces 1 row per group.
-                            if (aggregate.groupings().isEmpty()) {
-                                p = new LocalRelation(
-                                    aggregate.source(),
-                                    List.of(Expressions.attribute(aggregate.aggregates().getFirst())),
-                                    LocalSupplier.of(
-                                        new Block[] { BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, null, 1) }
-                                    )
-                                );
-                            } else {
-                                // Aggs cannot produce pages with 0 columns, so retain one grouping.
-                                Attribute attribute = Expressions.attribute(aggregate.groupings().getFirst());
-                                NamedExpression firstAggregate = aggregate.aggregates().getFirst();
-                                remaining = List.of(
-                                    new Alias(firstAggregate.source(), firstAggregate.name(), attribute, firstAggregate.id())
-                                );
-                                p = aggregate.with(aggregate.groupings(), remaining);
-                            }
-                        } else {
-                            p = aggregate.with(aggregate.groupings(), remaining);
-                        }
-                    }
-                } else if (p instanceof InlineJoin ij) {// TODO: InlineStats - add unit tests for this IJ removal
-                    var remaining = removeUnused(ij.right().output(), used, inlineJoinRightOutput);
-                    if (remaining != null) {
-                        if (remaining.isEmpty()) {
-                            // remove the InlineJoin altogether
-                            p = ij.left();
-                            recheck = true;
-                        }
-                        // TODO: InlineStats - prune ONLY the unused output columns from it? In other words, don't perform more aggs
-                        // if they will not be used anyway
-                    }
-                } else if (p instanceof Eval eval) {
-                    var remaining = removeUnused(eval.fields(), used, inlineJoinRightOutput);
-                    // no fields, no eval
-                    if (remaining != null) {
-                        if (remaining.isEmpty()) {
-                            p = eval.child();
-                            recheck = true;
-                        } else {
-                            p = new Eval(eval.source(), eval.child(), remaining);
-                        }
-                    }
-                } else if (p instanceof EsRelation esr && esr.indexMode() == IndexMode.LOOKUP) {
-                    // Normally, pruning EsRelation has no effect because InsertFieldExtraction only extracts the required fields, anyway.
-                    // However, InsertFieldExtraction can't be currently used in LOOKUP JOIN right index,
-                    // it works differently as we extract all fields (other than the join key) that the EsRelation has.
-                    var remaining = removeUnused(esr.output(), used, inlineJoinRightOutput);
-                    if (remaining != null) {
-                        p = new EsRelation(esr.source(), esr.indexPattern(), esr.indexMode(), esr.indexNameWithModes(), remaining);
-                    }
-                }
-            } while (recheck);
+                recheck.set(false);
+                p = switch (p) {
+                    case Aggregate agg -> pruneColumnsInAggregate(agg, used, inlineJoin);
+                    case InlineJoin inj -> pruneColumnsInInlineJoin(inj, used, recheck);
+                    case Eval eval -> pruneColumnsInEval(eval, used, recheck);
+                    case Project project -> inlineJoin ? pruneColumnsInProject(project, used) : p;
+                    case EsRelation esr -> pruneColumnsInEsRelation(esr, used);
+                    default -> p;
+                };
+            } while (recheck.get());
 
             used.addAll(p.references());
 
             // preserve the state before going to the next node
             return p;
         });
+    }
 
-        return pl;
+    private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, AttributeSet.Builder used, boolean inlineJoin) {
+        LogicalPlan p = aggregate;
+
+        var remaining = pruneUnusedAndAddReferences(aggregate.aggregates(), used);
+
+        if (remaining == null) {
+            return p;
+        }
+
+        if (remaining.isEmpty()) {
+            if (inlineJoin) {
+                p = emptyLocalRelation(aggregate);
+            } else if (aggregate.groupings().isEmpty()) {
+                // We still need to have a plan that produces 1 row per group.
+                p = new LocalRelation(
+                    aggregate.source(),
+                    List.of(Expressions.attribute(aggregate.aggregates().getFirst())),
+                    LocalSupplier.of(new Block[] { BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, null, 1) })
+                );
+            } else {
+                // Aggs cannot produce pages with 0 columns, so retain one grouping.
+                Attribute attribute = Expressions.attribute(aggregate.groupings().getFirst());
+                NamedExpression firstAggregate = aggregate.aggregates().getFirst();
+                remaining = List.of(new Alias(firstAggregate.source(), firstAggregate.name(), attribute, firstAggregate.id()));
+                p = aggregate.with(aggregate.groupings(), remaining);
+            }
+        } else {
+            // not expecting high groups cardinality, nested loops in lists should be fine, no need for a HashSet
+            if (inlineJoin && aggregate.groupings().containsAll(remaining)) {
+                // It's an INLINEJOIN and all remaining attributes are groupings, which are already part of the IJ output (from the
+                // left-hand side).
+                // TODO: INLINESTATS: revisit condition when adding support for INLINESTATS filters
+                if (aggregate.child() instanceof StubRelation stub) {
+                    var message = "Aggregate groups references ["
+                        + remaining
+                        + "] not in child's (StubRelation) output: ["
+                        + stub.outputSet()
+                        + "]";
+                    assert stub.outputSet().containsAll(Expressions.asAttributes(remaining)) : message;
+
+                    p = emptyLocalRelation(aggregate);
+                } else {
+                    // There are no aggregates to compute, just output the groupings; these are already in the IJ output, so only
+                    // restrict the output to what remained.
+                    p = new Project(aggregate.source(), aggregate.child(), remaining);
+                }
+            } else { // not an INLINEJOIN or there are actually aggregates to compute
+                p = aggregate.with(aggregate.groupings(), remaining);
+            }
+        }
+
+        return p;
+    }
+
+    private static LogicalPlan pruneColumnsInInlineJoin(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) {
+        LogicalPlan p = ij;
+
+        used.addAll(ij.references());
+        var right = pruneColumns(ij.right(), used, true);
+        if (right.output().isEmpty()) {
+            p = ij.left();
+            recheck.set(true);
+        } else if (right != ij.right()) {
+            // if the right side has been updated, replace it
+            p = ij.replaceRight(right);
+        }
+
+        return p;
+    }
+
+    private static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder used, Holder<Boolean> recheck) {
+        LogicalPlan p = eval;
+
+        var remaining = pruneUnusedAndAddReferences(eval.fields(), used);
+        // no fields, no eval
+        if (remaining != null) {
+            if (remaining.isEmpty()) {
+                p = eval.child();
+                recheck.set(true);
+            } else {
+                p = new Eval(eval.source(), eval.child(), remaining);
+            }
+        }
+
+        return p;
+    }
+
+    private static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder used) {
+        LogicalPlan p = project;
+
+        var remaining = pruneUnusedAndAddReferences(project.projections(), used);
+        if (remaining != null) {
+            p = remaining.isEmpty() || remaining.stream().allMatch(FieldAttribute.class::isInstance)
+                ? emptyLocalRelation(project)
+                : new Project(project.source(), project.child(), remaining);
+        } else if (project.output().stream().allMatch(FieldAttribute.class::isInstance)) {
+            // Use empty relation as a marker for a subsequent pass, in case the project is only outputting field attributes (which are
+            // already part of the INLINEJOIN left-hand side output).
+            p = emptyLocalRelation(project);
+        }
+
+        return p;
+    }
+
+    private static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet.Builder used) {
+        LogicalPlan p = esr;
+
+        if (esr.indexMode() == IndexMode.LOOKUP) {
+            // Normally, pruning EsRelation has no effect because InsertFieldExtraction only extracts the required fields, anyway.
+            // However, InsertFieldExtraction can't be currently used in LOOKUP JOIN right index,
+            // it works differently as we extract all fields (other than the join key) that the EsRelation has.
+            var remaining = pruneUnusedAndAddReferences(esr.output(), used);
+            if (remaining != null) {
+                p = new EsRelation(esr.source(), esr.indexPattern(), esr.indexMode(), esr.indexNameWithModes(), remaining);
+            }
+        }
+
+        return p;
+    }
+
+    private static LogicalPlan emptyLocalRelation(LogicalPlan plan) {
+        // create an empty local relation with no attributes
+        return new LocalRelation(plan.source(), List.of(), EmptyLocalSupplier.EMPTY);
     }
 
     /**
-     * Prunes attributes from the list not found in the given set.
-     * Returns null if no changed occurred.
+     * Prunes attributes from the `named` list that are not found in the given set (builder).
+     * Returns null if no pruning occurred.
+     * As a side effect, the references of the kept attributes are added to the input set (builder) -- irrespective of the return value.
      */
-    private static <N extends NamedExpression> List<N> removeUnused(List<N> named, AttributeSet.Builder used, List<Attribute> exceptions) {
+    private static <N extends NamedExpression> List<N> pruneUnusedAndAddReferences(List<N> named, AttributeSet.Builder used) {
         var clone = new ArrayList<>(named);
-        var it = clone.listIterator(clone.size());
 
-        // due to Eval, go in reverse
-        while (it.hasPrevious()) {
+        for (var it = clone.listIterator(clone.size()); it.hasPrevious();) {
             N prev = it.previous();
             var attr = prev.toAttribute();
-            if (used.contains(attr) == false && exceptions.contains(attr) == false) {
-                it.remove();
-            } else {
+            if (used.contains(attr)) {
                 used.addAll(prev.references());
+            } else {
+                it.remove();
             }
         }
+
         return clone.size() != named.size() ? clone : null;
     }
 }

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java

@@ -4125,7 +4125,7 @@ public class AnalyzerTests extends ESTestCase {
     }
 
     public void testGroupingOverridesInInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         verifyUnsupported("""
             from test
             | inlinestats MIN(salary) BY x = languages, x = x + 1

+ 1 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/AbstractLogicalPlanOptimizerTests.java

@@ -100,6 +100,7 @@ public abstract class AbstractLogicalPlanOptimizerTests extends ESTestCase {
                 EsqlTestUtils.TEST_CFG,
                 new EsqlFunctionRegistry(),
                 getIndexResultAirports,
+                defaultLookupResolution(),
                 enrichResolution,
                 emptyInferenceResolution()
             ),

+ 266 - 20
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

@@ -129,6 +129,7 @@ import org.elasticsearch.xpack.esql.plan.logical.join.Join;
 import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
 import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
 import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
+import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
 import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
 import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
@@ -140,6 +141,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
@@ -2443,7 +2445,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
             | sort salary, x
             | limit 15""");
 
-        var keep = as(plan, EsqlProject.class);
+        var keep = as(plan, Project.class);
         var topN = as(keep.child(), TopN.class);
         assertThat(topN.limit().fold(FoldContext.small()), equalTo(15));
         assertThat(orderNames(topN), contains("salary", "first_name"));
@@ -5732,39 +5734,283 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
         as(aggregate.child(), EsRelation.class);
     }
 
-    /**
-     * Expects
-     * Limit[1000[INTEGER]]
-     * \_InlineJoin[LEFT OUTER,[emp_no % 2{r}#1793],[emp_no % 2{r}#1793],[emp_no % 2{r}#1793]]
-     *   |_Eval[[emp_no{f}#1794 % 2[INTEGER] AS emp_no % 2]]
-     *   | \_EsRelation[test][_meta_field{f}#1800, emp_no{f}#1794, first_name{f}#..]
-     *   \_Aggregate[STANDARD,[emp_no % 2{r}#1793],[COUNT(salary{f}#1799,true[BOOLEAN]) AS c, emp_no % 2{r}#1793]]
-     *     \_StubRelation[[_meta_field{f}#1800, emp_no{f}#1794, first_name{f}#1795, gender{f}#1796, job{f}#1801, job.raw{f}#1802, langua
-     * ges{f}#1797, last_name{f}#1798, long_noidx{f}#1803, salary{f}#1799, emp_no % 2{r}#1793]]
-     */
-    @AwaitsFix(bugUrl = "Needs updating to join plan per above")
+    /*
+     * Limit[1000[INTEGER],true]
+     * \_InlineJoin[LEFT,[emp_no % 2{r}#6],[emp_no % 2{r}#6],[emp_no % 2{r}#6]]
+     *   |_Eval[[emp_no{f}#7 % 2[INTEGER] AS emp_no % 2#6]]
+     *   | \_Limit[1000[INTEGER],false]  <-- TODO: this needs to go
+     *   |   \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..]
+     *   \_Aggregate[[emp_no % 2{r}#6],[COUNT(salary{f}#12,true[BOOLEAN]) AS c#4, emp_no % 2{r}#6]]
+     *     \_StubRelation[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang
+     *          uages{f}#10, last_name{f}#11, long_noidx{f}#17, salary{f}#12, emp_no % 2{r}#6]]
+     */
     public void testInlinestatsNestedExpressionsInGroups() {
         var query = """
             FROM test
             | INLINESTATS c = COUNT(salary) by emp_no % 2
             """;
-        if (Build.current().isSnapshot() == false) {
-            var e = expectThrows(ParsingException.class, () -> analyze(query));
-            assertThat(e.getMessage(), containsString("line 2:3: mismatched input 'INLINESTATS' expecting {"));
+        if (releaseBuildForInlinestats(query)) {
             return;
         }
         var plan = optimizedPlan(query);
-        var limit = as(plan, Limit.class);
+        var limit = as(plan, Limit.class); // TODO: this needs to go
         var inline = as(limit.child(), InlineJoin.class);
-        var agg = as(inline.left(), Aggregate.class);
+        var eval = as(inline.left(), Eval.class);
+        assertThat(Expressions.names(eval.fields()), is(List.of("emp_no % 2")));
+        limit = asLimit(eval.child(), 1000, false);
+        var agg = as(inline.right(), Aggregate.class);
         var groupings = agg.groupings();
-        var aggs = agg.aggregates();
         var ref = as(groupings.get(0), ReferenceAttribute.class);
+        var aggs = agg.aggregates();
         assertThat(aggs.get(1), is(ref));
-        var eval = as(agg.child(), Eval.class);
-        assertThat(eval.fields(), hasSize(1));
         assertThat(eval.fields().get(0).toAttribute(), is(ref));
         assertThat(eval.fields().get(0).name(), is("emp_no % 2"));
+        var stub = as(agg.child(), StubRelation.class);
+    }
+
+    private static boolean releaseBuildForInlinestats(String query) {
+        if (Build.current().isSnapshot() == false) {
+            var e = expectThrows(ParsingException.class, () -> analyze(query));
+            assertThat(e.getMessage(), containsString("mismatched input 'INLINESTATS' expecting"));
+            return true;
+        }
+        return false;
+    }
+
+    /*
+     * Project[[emp_no{f}#12 AS x#8, emp_no{f}#12]]
+     * \_TopN[[Order[emp_no{f}#12,ASC,LAST]],1[INTEGER]]
+     *   \_EsRelation[test][_meta_field{f}#18, emp_no{f}#12, first_name{f}#13, ..]
+     */
+    public void testInlinestatsGetsPrunedEntirely() {
+        var query = """
+            FROM employees
+            | INLINESTATS x = avg(salary) BY emp_no
+            | EVAL x = emp_no
+            | SORT x
+            | KEEP x, emp_no
+            | LIMIT 1
+            """;
+        if (releaseBuildForInlinestats(query)) {
+            return;
+        }
+        var plan = optimizedPlan(query);
+
+        var project = as(plan, Project.class);
+        assertThat(Expressions.names(project.projections()), is(List.of("x", "emp_no")));
+        var topN = as(project.child(), TopN.class);
+        assertThat(topN.order().size(), is(1));
+        var relation = as(topN.child(), EsRelation.class);
+    }
+
+    // same as above
+    public void testDoubleInlinestatsGetsPrunedEntirely() {
+        var query = """
+            FROM employees
+            | INLINESTATS x = avg(salary) BY emp_no
+            | INLINESTATS y = avg(salary) BY languages
+            | EVAL y = emp_no
+            | EVAL x = y
+            | SORT x
+            | KEEP x, emp_no
+            | LIMIT 1
+            """;
+        if (releaseBuildForInlinestats(query)) {
+            return;
+        }
+        var plan = optimizedPlan(query);
+
+        var project = as(plan, Project.class);
+        assertThat(Expressions.names(project.projections()), is(List.of("x", "emp_no")));
+        var topN = as(project.child(), TopN.class);
+        assertThat(topN.order().size(), is(1));
+        var relation = as(topN.child(), EsRelation.class);
+    }
+
+    /*
+     * Project[[emp_no{f}#15 AS x#11, a{r}#7, emp_no{f}#15]]
+     * \_Limit[1[INTEGER],true]
+     *   \_InlineJoin[LEFT,[emp_no{f}#15],[emp_no{f}#15],[emp_no{r}#15]]
+     *     |_Limit[1[INTEGER],false]  <-- TODO: this needs to go
+     *     | \_EsRelation[test][_meta_field{f}#21, emp_no{f}#15, first_name{f}#16, ..]
+     *     \_Aggregate[[emp_no{f}#15],[COUNTDISTINCT(languages{f}#18,true[BOOLEAN]) AS a#7, emp_no{f}#15]]
+     *       \_StubRelation[[_meta_field{f}#21, emp_no{f}#15, first_name{f}#16, gender{f}#17, hire_date{f}#22, job{f}#23, job.raw{f}#24, l
+     *          anguages{f}#18, last_name{f}#19, long_noidx{f}#25, salary{f}#20]]
+     */
+    public void testInlinestatsGetsPrunedPartially() {
+        var query = """
+            FROM employees
+            | INLINESTATS x = AVG(salary), a = COUNT_DISTINCT(languages) BY emp_no
+            | EVAL x = emp_no
+            | KEEP x, a, emp_no
+            | LIMIT 1
+            """;
+        if (releaseBuildForInlinestats(query)) {
+            return;
+        }
+        var plan = optimizedPlan(query);
+
+        var project = as(plan, Project.class);
+        assertThat(Expressions.names(project.projections()), is(List.of("x", "a", "emp_no")));
+        var upperLimit = asLimit(project.child(), 1, true);
+        var inlineJoin = as(upperLimit.child(), InlineJoin.class);
+        assertThat(Expressions.names(inlineJoin.config().matchFields()), is(List.of("emp_no")));
+        // Left
+        var limit = as(inlineJoin.left(), Limit.class); // TODO: this needs to go
+        assertThat(limit.limit().fold(FoldContext.small()), equalTo(1));
+        var relation = as(limit.child(), EsRelation.class);
+        // Right
+        var agg = as(inlineJoin.right(), Aggregate.class);
+        assertMap(Expressions.names(agg.output()), is(List.of("a", "emp_no")));
+        var stub = as(agg.child(), StubRelation.class);
+    }
+
+    // same as above
+    public void testTrippleInlinestatsGetsPrunedPartially() {
+        var query = """
+            FROM employees
+            | INLINESTATS x = AVG(salary), a = COUNT_DISTINCT(languages) BY emp_no
+            | INLINESTATS y = AVG(salary), b = COUNT_DISTINCT(languages) BY emp_no
+            | EVAL x = emp_no
+            | INLINESTATS z = AVG(salary), c = COUNT_DISTINCT(languages), d = AVG(languages) BY last_name
+            | KEEP x, a, emp_no
+            | LIMIT 1
+            """;
+        if (releaseBuildForInlinestats(query)) {
+            return;
+        }
+        var plan = optimizedPlan(query);
+
+        var project = as(plan, Project.class);
+        assertThat(Expressions.names(project.projections()), is(List.of("x", "a", "emp_no")));
+        var upperLimit = asLimit(project.child(), 1, true);
+        var inlineJoin = as(upperLimit.child(), InlineJoin.class);
+        assertThat(Expressions.names(inlineJoin.config().matchFields()), is(List.of("emp_no")));
+        // Left
+        var limit = as(inlineJoin.left(), Limit.class);
+        assertThat(limit.limit().fold(FoldContext.small()), equalTo(1));
+        var relation = as(limit.child(), EsRelation.class);
+        // Right
+        var agg = as(inlineJoin.right(), Aggregate.class);
+        assertMap(Expressions.names(agg.output()), is(List.of("a", "emp_no")));
+        var stub = as(agg.child(), StubRelation.class);
+    }
+
+    /*
+     * Project[[abbrev{f}#19, scalerank{f}#21 AS backup_scalerank#4, language_name{f}#28 AS scalerank#11]]
+     * \_TopN[[Order[abbrev{f}#19,DESC,FIRST]],5[INTEGER]]
+     *   \_Join[LEFT,[scalerank{f}#21],[scalerank{f}#21],[language_code{f}#27]]
+     *     |_EsRelation[airports][abbrev{f}#19, city{f}#25, city_location{f}#26, coun..]
+     *     \_EsRelation[languages_lookup][LOOKUP][language_code{f}#27, language_name{f}#28]
+     */
+    public void testInlinestatsWithLookupJoin() {
+        var query = """
+            FROM airports
+            | EVAL backup_scalerank = scalerank
+            | RENAME scalerank AS language_code
+            | LOOKUP JOIN languages_lookup ON language_code
+            | RENAME language_name as scalerank
+            | DROP language_code
+            | INLINESTATS count=COUNT(*) BY scalerank
+            | SORT abbrev DESC
+            | KEEP abbrev, *scalerank
+            | LIMIT 5
+            """;
+        assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled());
+        if (releaseBuildForInlinestats(query)) {
+            return;
+        }
+
+        var plan = planAirports(query);
+        var project = as(plan, Project.class);
+        assertThat(Expressions.names(project.projections()), is(List.of("abbrev", "backup_scalerank", "scalerank")));
+        var topN = as(project.child(), TopN.class);
+        assertThat(topN.order().size(), is(1));
+        var order = as(topN.order().get(0), Order.class);
+        assertThat(order.direction(), equalTo(Order.OrderDirection.DESC));
+        assertThat(order.nullsPosition(), equalTo(Order.NullsPosition.FIRST));
+        assertThat(Expressions.name(order.child()), equalTo("abbrev"));
+        var join = as(topN.child(), Join.class);
+        assertThat(Expressions.names(join.config().matchFields()), is(List.of("scalerank")));
+        var left = as(join.left(), EsRelation.class);
+        assertThat(left.concreteIndices(), is(Set.of("airports")));
+        var right = as(join.right(), EsRelation.class);
+        assertThat(right.concreteIndices(), is(Set.of("languages_lookup")));
+    }
+
+    /*
+     * EsqlProject[[avg{r}#4, emp_no{f}#9, first_name{f}#10]]
+     * \_Limit[10[INTEGER],true]
+     *   \_InlineJoin[LEFT,[emp_no{f}#9],[emp_no{f}#9],[emp_no{r}#9]]
+     *     |_Limit[10[INTEGER],false]  <-- TODO: this needs to go
+     *     | \_EsRelation[test][_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..]
+     *     \_Project[[avg{r}#4, emp_no{f}#9]]
+     *       \_Eval[[$$SUM$avg$0{r$}#20 / $$COUNT$avg$1{r$}#21 AS avg#4]]
+     *         \_Aggregate[[emp_no{f}#9],[SUM(salary{f}#14,true[BOOLEAN]) AS $$SUM$avg$0#20, COUNT(salary{f}#14,true[BOOLEAN]) AS $$COUNT$
+     *              avg$1#21, emp_no{f}#9]]
+     *           \_StubRelation[[_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, gender{f}#11, hire_date{f}#16, job{f}#17, job.raw{f}#18,
+     *              languages{f}#12, last_name{f}#13, long_noidx{f}#19, salary{f}#14]]
+     */
+    public void testInlinestatsWithAvg() {
+        var query = """
+            FROM employees
+            | INLINESTATS avg = AVG(salary) BY emp_no
+            | KEEP avg, emp_no, first_name
+            | LIMIT 10
+            """;
+        if (releaseBuildForInlinestats(query)) {
+            return;
+        }
+        var plan = optimizedPlan(query);
+
+        var esqlProject = as(plan, EsqlProject.class);
+        assertThat(Expressions.names(esqlProject.projections()), is(List.of("avg", "emp_no", "first_name")));
+        var upperLimit = asLimit(esqlProject.child(), 10, true);
+        var inlineJoin = as(upperLimit.child(), InlineJoin.class);
+        assertThat(Expressions.names(inlineJoin.config().matchFields()), is(List.of("emp_no")));
+        // Left
+        var limit = asLimit(inlineJoin.left(), 10, false); // TODO: this needs to go
+        var relation = as(limit.child(), EsRelation.class);
+        // Right
+        var project = as(inlineJoin.right(), Project.class);
+        assertThat(Expressions.names(project.projections()), contains("avg", "emp_no"));
+        var eval = as(project.child(), Eval.class);
+        assertThat(Expressions.names(eval.fields()), is(List.of("avg")));
+        var agg = as(eval.child(), Aggregate.class);
+        assertMap(Expressions.names(agg.output()), is(List.of("$$SUM$avg$0", "$$COUNT$avg$1", "emp_no")));
+        var stub = as(agg.child(), StubRelation.class);
+    }
+
+    /*
+     * EsqlProject[[emp_no{r}#5]]
+     * \_Limit[1000[INTEGER],false]
+     *   \_LocalRelation[[salary{r}#3, emp_no{r}#5, gender{r}#7],
+     *      org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@9d5b596d]
+     */
+    public void testInlinestatsWithRow() {
+        var query = """
+            ROW salary = 12300, emp_no = 5, gender = "F"
+            | EVAL salaryK = salary/1000
+            | INLINESTATS sum = SUM(salaryK) BY gender
+            | KEEP emp_no
+            """;
+        if (releaseBuildForInlinestats(query)) {
+            return;
+        }
+        var plan = optimizedPlan(query);
+
+        var esqlProject = as(plan, EsqlProject.class);
+        assertThat(Expressions.names(esqlProject.projections()), is(List.of("emp_no")));
+        var limit = asLimit(esqlProject.child(), 1000, false);
+        var localRelation = as(limit.child(), LocalRelation.class);
+        assertThat(
+            localRelation.output(),
+            contains(
+                new ReferenceAttribute(EMPTY, "salary", INTEGER),
+                new ReferenceAttribute(EMPTY, "emp_no", INTEGER),
+                new ReferenceAttribute(EMPTY, "gender", KEYWORD)
+            )
+        );
     }
 
     /**

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvalsTests.java

@@ -83,7 +83,7 @@ public class PropagateInlineEvalsTests extends ESTestCase {
      *     \_StubRelation[[emp_no{f}#11, languages{f}#14, gender{f}#13, y{r}#10]]
      */
     public void testGroupingAliasingMoved_To_LeftSideOfJoin() {
-        assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         var plan = plan("""
             from test
             | keep emp_no, languages, gender
@@ -126,7 +126,7 @@ public class PropagateInlineEvalsTests extends ESTestCase {
      * {r}#21]]
      */
     public void testGroupingAliasingMoved_To_LeftSideOfJoin_WithExpression() {
-        assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         var plan = plan("""
             from test
             | keep emp_no, languages, gender, last_name, first_name

+ 15 - 15
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java

@@ -35,7 +35,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testBasicFromCommandWithInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("from test | inlinestats max(salary) by gender", ALL_FIELDS);
     }
 
@@ -44,7 +44,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testBasicFromCommandWithMetadata_AndInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("from test metadata _index, _id, _version | inlinestats max(salary)", ALL_FIELDS);
     }
 
@@ -310,7 +310,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testLimitZero_WithInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("""
             FROM employees
             | INLINESTATS COUNT(*), MAX(salary) BY gender
@@ -325,7 +325,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testDocsDropHeight_WithInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("""
             FROM employees
             | DROP height
@@ -341,7 +341,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testDocsDropHeightWithWildcard_AndInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("""
             FROM employees
             | INLINESTATS MAX(salary) BY gender
@@ -508,7 +508,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testSortWithLimitOne_DropHeight_WithInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("from employees | inlinestats avg(salary) by languages | sort languages | limit 1 | drop height*", ALL_FIELDS);
     }
 
@@ -808,7 +808,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testFilterById_WithInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("FROM apps metadata _id | INLINESTATS max(rate) | WHERE _id == \"4\"", ALL_FIELDS);
     }
 
@@ -1279,7 +1279,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testProjectDropPattern_WithInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("""
             from test
             | inlinestats max(foo) by bar
@@ -1362,7 +1362,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testCountAllAndOtherStatGrouped_WithInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("""
             from test
             | inlinestats c = count(*), min = min(emp_no) by languages
@@ -1401,7 +1401,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testCountAllWithEval_AndInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("""
             from test
             | rename languages as l
@@ -1414,7 +1414,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testKeepAfterEval_AndInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("""
             from test
             | rename languages as l
@@ -1427,7 +1427,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testKeepBeforeEval_AndInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("""
             from test
             | rename languages as l
@@ -1440,7 +1440,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testStatsBeforeEval_AndInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("""
             from test
             | rename languages as l
@@ -1452,7 +1452,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testStatsBeforeInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("""
             from test
             | stats min = min(salary) by languages
@@ -1461,7 +1461,7 @@ public class FieldNameUtilsTests extends ESTestCase {
     }
 
     public void testKeepBeforeInlinestats() {
-        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+        assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V9.isEnabled());
         assertFieldNames("""
             from test
             | keep languages, salary