Browse Source

ESQL: INLINESTATS (#109583)

This implements `INLINESTATS`. Most of the heavy lifting is done by
`LOOKUP`, with this change mostly adding a new abstraction to logical
plans, and interface I'm calling `Phased`. Implementing this interface
allows a logical plan node to cut the query into phases. `INLINESTATS`
implements it by asking for a "first phase" that's the same query, up to
`INLINESTATS`, but with `INLINESTATS` replaced with `STATS`. The next
phase replaces the `INLINESTATS` with a `LOOKUP` on the results of the
first phase.

So, this query:
```
FROM foo
| EVAL bar = a * b
| INLINESTATS m = MAX(bar) BY b
| WHERE m = bar
| LIMIT 1
```

gets split into
```
FROM foo
| EVAL bar = a * b
| STATS m = MAX(bar) BY b
```

followed by
```
FROM foo
| EVAL bar = a * b
| LOOKUP (results of m = MAX(bar) BY b) ON b
| WHERE m = bar
| LIMIT 1
```
Nik Everett 1 year ago
parent
commit
b5c6c2da30
28 changed files with 1569 additions and 36 deletions
  1. 29 0
      docs/changelog/109583.yaml
  2. 6 0
      docs/reference/esql/esql-commands.asciidoc
  3. 102 0
      docs/reference/esql/processing-commands/inlinestats.asciidoc
  4. 1 1
      docs/reference/esql/processing-commands/lookup.asciidoc
  5. 3 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/table/RowInTableLookup.java
  6. 9 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/RowInTableLookupOperator.java
  7. 1 0
      x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java
  8. 256 14
      x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java
  9. 11 1
      x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java
  10. 503 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
  11. 61 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec
  12. 5 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
  13. 11 9
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java
  14. 2 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java
  15. 3 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java
  16. 2 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/package-info.java
  17. 4 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java
  18. 7 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java
  19. 179 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java
  20. 135 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java
  21. 39 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java
  22. 2 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java
  23. 43 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
  24. 2 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java
  25. 2 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java
  26. 2 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java
  27. 146 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java
  28. 3 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java

+ 29 - 0
docs/changelog/109583.yaml

@@ -0,0 +1,29 @@
+pr: 109583
+summary: "ESQL: INLINESTATS"
+area: ES|QL
+type: feature
+issues:
+ - 107589
+highlight:
+  title: "ESQL: INLINESTATS"
+  body: |-
+    This adds the `INLINESTATS` command to ESQL which performs a STATS and
+    then enriches the results into the output stream. So, this query:
+
+    [source,esql]
+    ----
+    FROM test
+    | INLINESTATS m=MAX(a * b) BY b
+    | WHERE m == a * b
+    | SORT a DESC, b DESC
+    | LIMIT 3
+    ----
+
+    Produces output like:
+
+    |  a  |  b  |   m   |
+    | --- | --- | ----- |
+    |  99 | 999 | 98901 |
+    |  99 | 998 | 98802 |
+    |  99 | 997 | 98703 |
+  notable: true

+ 6 - 0
docs/reference/esql/esql-commands.asciidoc

@@ -37,6 +37,9 @@ image::images/esql/processing-command.svg[A processing command changing an input
 * <<esql-enrich>>
 * <<esql-eval>>
 * <<esql-grok>>
+ifeval::["{release-state}"=="unreleased"]
+* experimental:[] <<esql-inlinestats-by>>
+endif::[]
 * <<esql-keep>>
 * <<esql-limit>>
 ifeval::["{release-state}"=="unreleased"]
@@ -59,6 +62,9 @@ include::processing-commands/drop.asciidoc[]
 include::processing-commands/enrich.asciidoc[]
 include::processing-commands/eval.asciidoc[]
 include::processing-commands/grok.asciidoc[]
+ifeval::["{release-state}"=="unreleased"]
+include::processing-commands/inlinestats.asciidoc[]
+endif::[]
 include::processing-commands/keep.asciidoc[]
 include::processing-commands/limit.asciidoc[]
 ifeval::["{release-state}"=="unreleased"]

+ 102 - 0
docs/reference/esql/processing-commands/inlinestats.asciidoc

@@ -0,0 +1,102 @@
+[discrete]
+[[esql-inlinestats-by]]
+=== `INLINESTATS ... BY`
+
+experimental::["INLINESTATS is highly experimental and only available in SNAPSHOT versions."]
+
+The `INLINESTATS` command calculates an aggregate result and adds new columns
+with the result to the stream of input data.
+
+**Syntax**
+
+[source,esql]
+----
+INLINESTATS [column1 =] expression1[, ..., [columnN =] expressionN]
+[BY grouping_expression1[, ..., grouping_expressionN]]
+----
+
+*Parameters*
+
+`columnX`::
+The name by which the aggregated value is returned. If omitted, the name is
+equal to the corresponding expression (`expressionX`). If multiple columns
+have the same name, all but the rightmost column with this name will be ignored.
+
+`expressionX`::
+An expression that computes an aggregated value. If its name coincides with one
+of the computed columns, that column will be ignored.
+
+`grouping_expressionX`::
+An expression that outputs the values to group by.
+
+NOTE: Individual `null` values are skipped when computing aggregations.
+
+*Description*
+
+The `INLINESTATS` command calculates an aggregate result and merges that result
+back into the stream of input data. Without the optional `BY` clause this will
+produce a single result which is appended to each row. With a `BY` clause this
+will produce one result per grouping and merge the result into the stream based on
+matching group keys.
+
+All of the <<esql-agg-functions,aggregation functions>> are supported.
+
+*Examples*
+
+Find the employees that speak the most languages (it's a tie!):
+
+[source.merge.styled,esql]
+----
+include::{esql-specs}/inlinestats.csv-spec[tag=max-languages]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/inlinestats.csv-spec[tag=max-languages-result]
+|===
+
+Find the longest tenured employee who's last name starts with each letter of the alphabet:
+
+[source.merge.styled,esql]
+----
+include::{esql-specs}/inlinestats.csv-spec[tag=longest-tenured-by-first]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/inlinestats.csv-spec[tag=longest-tenured-by-first-result]
+|===
+
+Find the northern and southern most airports:
+
+[source.merge.styled,esql]
+----
+include::{esql-specs}/inlinestats.csv-spec[tag=extreme-airports]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/inlinestats.csv-spec[tag=extreme-airports-result]
+|===
+
+NOTE: Our test data doesn't have many "small" airports.
+
+If a `BY` field is multivalued then `INLINESTATS` will put the row in *each*
+bucket like <<esql-stats-by>>:
+
+[source.merge.styled,esql]
+----
+include::{esql-specs}/inlinestats.csv-spec[tag=mv-group]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/inlinestats.csv-spec[tag=mv-group-result]
+|===
+
+To treat each group key as its own row use <<esql-mv_expand>> before `INLINESTATS`:
+
+[source.merge.styled,esql]
+----
+include::{esql-specs}/inlinestats.csv-spec[tag=mv-expand]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/inlinestats.csv-spec[tag=mv-expand-result]
+|===

+ 1 - 1
docs/reference/esql/processing-commands/lookup.asciidoc

@@ -2,7 +2,7 @@
 [[esql-lookup]]
 === `LOOKUP`
 
-experimental::["LOOKUP is a highly experimental and only available in SNAPSHOT versions."]
+experimental::["LOOKUP is highly experimental and only available in SNAPSHOT versions."]
 
 `LOOKUP` matches values from the input against a `table` provided in the request,
 adding the other fields from the `table` to the output.

+ 3 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/table/RowInTableLookup.java

@@ -37,6 +37,9 @@ public abstract sealed class RowInTableLookup implements Releasable permits Empt
     public abstract String toString();
 
     public static RowInTableLookup build(BlockFactory blockFactory, Block[] keys) {
+        if (keys.length < 1) {
+            throw new IllegalArgumentException("expected [keys] to be non-empty");
+        }
         int positions = keys[0].getPositionCount();
         for (int k = 0; k < keys.length; k++) {
             if (positions != keys[k].getPositionCount()) {

+ 9 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/RowInTableLookupOperator.java

@@ -40,6 +40,12 @@ public class RowInTableLookupOperator extends AbstractPageMappingToIteratorOpera
      * are never closed, so we need to build them from a non-tracking factory.
      */
     public record Factory(Key[] keys, int[] blockMapping) implements Operator.OperatorFactory {
+        public Factory {
+            if (keys.length < 1) {
+                throw new IllegalArgumentException("expected [keys] to be non-empty");
+            }
+        }
+
         @Override
         public Operator get(DriverContext driverContext) {
             return new RowInTableLookupOperator(driverContext.blockFactory(), keys, blockMapping);
@@ -56,6 +62,9 @@ public class RowInTableLookupOperator extends AbstractPageMappingToIteratorOpera
     private final int[] blockMapping;
 
     public RowInTableLookupOperator(BlockFactory blockFactory, Key[] keys, int[] blockMapping) {
+        if (keys.length < 1) {
+            throw new IllegalArgumentException("expected [keys] to be non-empty");
+        }
         this.blockMapping = blockMapping;
         this.keys = new ArrayList<>(keys.length);
         Block[] blocks = new Block[keys.length];

+ 1 - 0
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

@@ -110,6 +110,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
             "Test " + testName + " is skipped on " + Clusters.oldVersion(),
             isEnabled(testName, instructions, Clusters.oldVersion())
         );
+        assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats"));
     }
 
     private TestFeatureService remoteFeaturesService() throws IOException {

+ 256 - 14
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

@@ -10,15 +10,19 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 
 import org.apache.http.util.EntityUtils;
+import org.apache.lucene.search.DocIdSetIterator;
 import org.elasticsearch.Build;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ListMatcher;
+import org.elasticsearch.test.MapMatcher;
 import org.elasticsearch.test.TestClustersThreadFilter;
 import org.elasticsearch.test.cluster.ElasticsearchCluster;
 import org.elasticsearch.test.cluster.LogType;
+import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
@@ -27,14 +31,23 @@ import org.junit.ClassRule;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
+import static org.elasticsearch.test.ListMatcher.matchesList;
+import static org.elasticsearch.test.MapMatcher.assertMap;
+import static org.elasticsearch.test.MapMatcher.matchesMap;
+import static org.hamcrest.Matchers.any;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
 import static org.hamcrest.core.Is.is;
 
 @ThreadLeakFilters(filters = TestClustersThreadFilter.class)
@@ -49,7 +62,7 @@ public class RestEsqlIT extends RestEsqlTestCase {
         return cluster.getHttpAddresses();
     }
 
-    @ParametersFactory
+    @ParametersFactory(argumentFormatting = "%1s")
     public static List<Object[]> modes() {
         return Arrays.stream(Mode.values()).map(m -> new Object[] { m }).toList();
     }
@@ -59,19 +72,7 @@ public class RestEsqlIT extends RestEsqlTestCase {
     }
 
     public void testBasicEsql() throws IOException {
-        StringBuilder b = new StringBuilder();
-        for (int i = 0; i < 1000; i++) {
-            b.append(String.format(Locale.ROOT, """
-                {"create":{"_index":"%s"}}
-                {"@timestamp":"2020-12-12","test":"value%s","value":%d}
-                """, testIndexName(), i, i));
-        }
-        Request bulk = new Request("POST", "/_bulk");
-        bulk.addParameter("refresh", "true");
-        bulk.addParameter("filter_path", "errors");
-        bulk.setJsonEntity(b.toString());
-        Response response = client().performRequest(bulk);
-        Assert.assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8));
+        indexTestData();
 
         RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | stats avg(value)");
         if (Build.current().isSnapshot()) {
@@ -273,6 +274,247 @@ public class RestEsqlIT extends RestEsqlTestCase {
         assertThat(re.getMessage(), containsString("[6:10] Duplicate field 'a'"));
     }
 
+    /**
+     * INLINESTATS <strong>can</strong> group on {@code NOW()}. It's a little silly, but
+     * doing something like {@code DATE_TRUNC(1 YEAR, NOW() - 1970-01-01T00:00:00Z)} is
+     * much more sensible. But just grouping on {@code NOW()} is enough to test this.
+     * <p>
+     *     This works because {@code NOW()} locks it's value at the start of the entire
+     *     query. It's part of the "configuration" of the query.
+     * </p>
+     */
+    public void testInlineStatsNow() throws IOException {
+        indexTestData();
+
+        RequestObjectBuilder builder = requestObjectBuilder().query(
+            fromIndex() + " | EVAL now=NOW() | INLINESTATS AVG(value) BY now | SORT value ASC"
+        );
+        Map<String, Object> result = runEsql(builder);
+        ListMatcher values = matchesList();
+        for (int i = 0; i < 1000; i++) {
+            values = values.item(
+                matchesList().item("2020-12-12T00:00:00.000Z")
+                    .item("value" + i)
+                    .item("value" + i)
+                    .item(i)
+                    .item(any(String.class))
+                    .item(499.5)
+            );
+        }
+        assertMap(
+            result,
+            matchesMap().entry(
+                "columns",
+                matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date"))
+                    .item(matchesMap().entry("name", "test").entry("type", "text"))
+                    .item(matchesMap().entry("name", "test.keyword").entry("type", "keyword"))
+                    .item(matchesMap().entry("name", "value").entry("type", "long"))
+                    .item(matchesMap().entry("name", "now").entry("type", "date"))
+                    .item(matchesMap().entry("name", "AVG(value)").entry("type", "double"))
+            ).entry("values", values)
+        );
+    }
+
+    public void testProfile() throws IOException {
+        indexTestData();
+
+        RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | STATS AVG(value)");
+        builder.profile(true);
+        if (Build.current().isSnapshot()) {
+            // Lock to shard level partitioning, so we get consistent profile output
+            builder.pragmas(Settings.builder().put("data_partitioning", "shard").build());
+        }
+        Map<String, Object> result = runEsql(builder);
+        assertMap(
+            result,
+            matchesMap().entry("columns", matchesList().item(matchesMap().entry("name", "AVG(value)").entry("type", "double")))
+                .entry("values", List.of(List.of(499.5d)))
+                .entry("profile", matchesMap().entry("drivers", instanceOf(List.class)))
+        );
+
+        MapMatcher commonProfile = matchesMap().entry("iterations", greaterThan(0))
+            .entry("cpu_nanos", greaterThan(0))
+            .entry("took_nanos", greaterThan(0))
+            .entry("operators", instanceOf(List.class));
+        List<List<String>> signatures = new ArrayList<>();
+        @SuppressWarnings("unchecked")
+        List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");
+        for (Map<String, Object> p : profiles) {
+            assertThat(p, commonProfile);
+            List<String> sig = new ArrayList<>();
+            @SuppressWarnings("unchecked")
+            List<Map<String, Object>> operators = (List<Map<String, Object>>) p.get("operators");
+            for (Map<String, Object> o : operators) {
+                sig.add(checkOperatorProfile(o));
+            }
+            signatures.add(sig);
+        }
+        assertThat(
+            signatures,
+            containsInAnyOrder(
+                matchesList().item("LuceneSourceOperator")
+                    .item("ValuesSourceReaderOperator")
+                    .item("AggregationOperator")
+                    .item("ExchangeSinkOperator"),
+                matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator"),
+                matchesList().item("ExchangeSourceOperator")
+                    .item("AggregationOperator")
+                    .item("ProjectOperator")
+                    .item("LimitOperator")
+                    .item("EvalOperator")
+                    .item("ProjectOperator")
+                    .item("OutputOperator")
+            )
+        );
+    }
+
+    public void testInlineStatsProfile() throws IOException {
+        indexTestData();
+
+        RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | INLINESTATS AVG(value) | SORT value ASC");
+        builder.profile(true);
+        if (Build.current().isSnapshot()) {
+            // Lock to shard level partitioning, so we get consistent profile output
+            builder.pragmas(Settings.builder().put("data_partitioning", "shard").build());
+        }
+        Map<String, Object> result = runEsql(builder);
+        ListMatcher values = matchesList();
+        for (int i = 0; i < 1000; i++) {
+            values = values.item(matchesList().item("2020-12-12T00:00:00.000Z").item("value" + i).item("value" + i).item(i).item(499.5));
+        }
+        assertMap(
+            result,
+            matchesMap().entry(
+                "columns",
+                matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date"))
+                    .item(matchesMap().entry("name", "test").entry("type", "text"))
+                    .item(matchesMap().entry("name", "test.keyword").entry("type", "keyword"))
+                    .item(matchesMap().entry("name", "value").entry("type", "long"))
+                    .item(matchesMap().entry("name", "AVG(value)").entry("type", "double"))
+            ).entry("values", values).entry("profile", matchesMap().entry("drivers", instanceOf(List.class)))
+        );
+
+        MapMatcher commonProfile = matchesMap().entry("iterations", greaterThan(0))
+            .entry("cpu_nanos", greaterThan(0))
+            .entry("took_nanos", greaterThan(0))
+            .entry("operators", instanceOf(List.class));
+        List<List<String>> signatures = new ArrayList<>();
+        @SuppressWarnings("unchecked")
+        List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");
+        for (Map<String, Object> p : profiles) {
+            assertThat(p, commonProfile);
+            List<String> sig = new ArrayList<>();
+            @SuppressWarnings("unchecked")
+            List<Map<String, Object>> operators = (List<Map<String, Object>>) p.get("operators");
+            for (Map<String, Object> o : operators) {
+                sig.add(checkOperatorProfile(o));
+            }
+            signatures.add(sig);
+        }
+        assertThat(
+            signatures,
+            containsInAnyOrder(
+                // First pass read and start agg
+                matchesList().item("LuceneSourceOperator")
+                    .item("ValuesSourceReaderOperator")
+                    .item("AggregationOperator")
+                    .item("ExchangeSinkOperator"),
+                // First pass node level reduce
+                matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator"),
+                // First pass finish agg
+                matchesList().item("ExchangeSourceOperator")
+                    .item("AggregationOperator")
+                    .item("ProjectOperator")
+                    .item("EvalOperator")
+                    .item("ProjectOperator")
+                    .item("OutputOperator"),
+                // Second pass read and join via eval
+                matchesList().item("LuceneSourceOperator")
+                    .item("EvalOperator")
+                    .item("ValuesSourceReaderOperator")
+                    .item("TopNOperator")
+                    .item("ValuesSourceReaderOperator")
+                    .item("ProjectOperator")
+                    .item("ExchangeSinkOperator"),
+                // Second pass node level reduce
+                matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator"),
+                // Second pass finish
+                matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("OutputOperator")
+            )
+        );
+    }
+
+    private String checkOperatorProfile(Map<String, Object> o) {
+        String name = (String) o.get("operator");
+        name = name.replaceAll("\\[.+", "");
+        MapMatcher status = switch (name) {
+            case "LuceneSourceOperator" -> matchesMap().entry("processed_slices", greaterThan(0))
+                .entry("processed_shards", List.of(testIndexName() + ":0"))
+                .entry("total_slices", greaterThan(0))
+                .entry("slice_index", 0)
+                .entry("slice_max", 0)
+                .entry("slice_min", 0)
+                .entry("current", DocIdSetIterator.NO_MORE_DOCS)
+                .entry("pages_emitted", greaterThan(0))
+                .entry("processing_nanos", greaterThan(0))
+                .entry("processed_queries", List.of("*:*"));
+            case "ValuesSourceReaderOperator" -> basicProfile().entry("readers_built", matchesMap().extraOk());
+            case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0)).entry("aggregation_nanos", greaterThan(0));
+            case "ExchangeSinkOperator" -> matchesMap().entry("pages_accepted", greaterThan(0));
+            case "ExchangeSourceOperator" -> matchesMap().entry("pages_emitted", greaterThan(0)).entry("pages_waiting", 0);
+            case "ProjectOperator", "EvalOperator" -> basicProfile();
+            case "LimitOperator" -> matchesMap().entry("pages_processed", greaterThan(0))
+                .entry("limit", 1000)
+                .entry("limit_remaining", 999);
+            case "OutputOperator" -> null;
+            case "TopNOperator" -> matchesMap().entry("occupied_rows", 0)
+                .entry("ram_used", instanceOf(String.class))
+                .entry("ram_bytes_used", greaterThan(0));
+            default -> throw new AssertionError("unexpected status: " + o);
+        };
+        MapMatcher expectedOp = matchesMap().entry("operator", startsWith(name));
+        if (status != null) {
+            expectedOp = expectedOp.entry("status", status);
+        }
+        assertMap(o, expectedOp);
+        return name;
+    }
+
+    private MapMatcher basicProfile() {
+        return matchesMap().entry("pages_processed", greaterThan(0)).entry("process_nanos", greaterThan(0));
+    }
+
+    private void indexTestData() throws IOException {
+        Request createIndex = new Request("PUT", testIndexName());
+        createIndex.setJsonEntity("""
+            {
+              "settings": {
+                "index": {
+                  "number_of_shards": 1
+                }
+              }
+            }""");
+        Response response = client().performRequest(createIndex);
+        assertThat(
+            entityToMap(response.getEntity(), XContentType.JSON),
+            matchesMap().entry("shards_acknowledged", true).entry("index", testIndexName()).entry("acknowledged", true)
+        );
+
+        StringBuilder b = new StringBuilder();
+        for (int i = 0; i < 1000; i++) {
+            b.append(String.format(Locale.ROOT, """
+                {"create":{"_index":"%s"}}
+                {"@timestamp":"2020-12-12","test":"value%s","value":%d}
+                """, testIndexName(), i, i));
+        }
+        Request bulk = new Request("POST", "/_bulk");
+        bulk.addParameter("refresh", "true");
+        bulk.addParameter("filter_path", "errors");
+        bulk.setJsonEntity(b.toString());
+        response = client().performRequest(bulk);
+        Assert.assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8));
+    }
+
     private void assertException(String query, String... errorMessages) throws IOException {
         ResponseException re = expectThrows(ResponseException.class, () -> runEsqlSync(requestObjectBuilder().query(query)));
         assertThat(re.getResponse().getStatusLine().getStatusCode(), equalTo(400));

+ 11 - 1
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

@@ -119,6 +119,8 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
 
         private Boolean keepOnCompletion = null;
 
+        private Boolean profile = null;
+
         public RequestObjectBuilder() throws IOException {
             this(randomFrom(XContentType.values()));
         }
@@ -180,6 +182,11 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
             return this;
         }
 
+        public RequestObjectBuilder profile(boolean profile) {
+            this.profile = profile;
+            return this;
+        }
+
         public RequestObjectBuilder build() throws IOException {
             if (isBuilt == false) {
                 if (tables != null) {
@@ -195,6 +202,9 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
                     }
                     builder.endObject();
                 }
+                if (profile != null) {
+                    builder.field("profile", profile);
+                }
                 builder.endObject();
                 isBuilt = true;
             }
@@ -756,7 +766,7 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         return Collections.unmodifiableMap(copy);
     }
 
-    static Map<String, Object> entityToMap(HttpEntity entity, XContentType expectedContentType) throws IOException {
+    protected static Map<String, Object> entityToMap(HttpEntity entity, XContentType expectedContentType) throws IOException {
         try (InputStream content = entity.getContent()) {
             XContentType xContentType = XContentType.fromMediaType(entity.getContentType().getValue());
             assertEquals(expectedContentType, xContentType);

+ 503 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec

@@ -0,0 +1,503 @@
+maxOfInt
+required_capability: inlinestats
+
+// tag::max-languages[]
+FROM employees
+| KEEP emp_no, languages
+| INLINESTATS max_lang = MAX(languages) 
+| WHERE max_lang == languages
+| SORT emp_no ASC
+| LIMIT 5
+// end::max-languages[]
+;
+
+// tag::max-languages-result[]
+emp_no:integer | languages:integer | max_lang:integer
+         10002 |                 5 | 5
+         10004 |                 5 | 5
+         10011 |                 5 | 5
+         10012 |                 5 | 5
+         10014 |                 5 | 5
+// end::max-languages-result[]
+;
+
+maxOfIntByKeyword
+required_capability: inlinestats
+
+FROM employees
+| KEEP emp_no, languages, gender
+| INLINESTATS max_lang = MAX(languages) BY gender 
+| WHERE max_lang == languages
+| SORT emp_no ASC
+| LIMIT 5;
+
+emp_no:integer | languages:integer | gender:keyword | max_lang:integer
+         10002 |                 5 | F              | 5
+         10004 |                 5 | M              | 5
+         10011 |                 5 | null           | 5
+         10012 |                 5 | null           | 5
+         10014 |                 5 | null           | 5
+;
+
+maxOfLongByKeyword
+required_capability: inlinestats
+
+FROM employees
+| KEEP emp_no, avg_worked_seconds, gender
+| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY gender 
+| WHERE max_avg_worked_seconds == avg_worked_seconds
+| SORT emp_no ASC;
+
+emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_seconds:long
+         10007 |               393084805 | F              | 393084805
+         10015 |               390266432 | null           | 390266432
+         10030 |               394597613 | M              | 394597613
+;
+
+maxOfLong
+required_capability: inlinestats
+
+FROM employees
+| KEEP emp_no, avg_worked_seconds, gender
+| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) 
+| WHERE max_avg_worked_seconds == avg_worked_seconds
+| SORT emp_no ASC;
+
+emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_seconds:long
+         10030 |               394597613 | M              | 394597613
+;
+
+// TODO allow inline calculation like BY l = SUBSTRING(
+maxOfLongByCalculatedKeyword
+required_capability: inlinestats
+
+// tag::longest-tenured-by-first[]
+FROM employees
+| EVAL l = SUBSTRING(last_name, 0, 1)
+| KEEP emp_no, avg_worked_seconds, l
+| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l 
+| WHERE max_avg_worked_seconds == avg_worked_seconds
+| SORT l ASC
+| LIMIT 5
+// end::longest-tenured-by-first[]
+;
+
+// tag::longest-tenured-by-first-result[]
+emp_no:integer | avg_worked_seconds:long | l:keyword | max_avg_worked_seconds:long
+         10065 |               372660279 | A         | 372660279
+         10074 |               382397583 | B         | 382397583
+         10044 |               387408356 | C         | 387408356
+         10030 |               394597613 | D         | 394597613
+         10087 |               305782871 | E         | 305782871
+// end::longest-tenured-by-first-result[]
+;
+
+maxOfLongByInt
+required_capability: inlinestats
+
+FROM employees
+| KEEP emp_no, avg_worked_seconds, languages
+| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY languages
+| WHERE max_avg_worked_seconds == avg_worked_seconds
+| SORT languages ASC;
+
+emp_no:integer | avg_worked_seconds:long | languages:integer | max_avg_worked_seconds:long
+         10044 |               387408356 |                 1 | 387408356
+         10099 |               377713748 |                 2 | 377713748
+         10030 |               394597613 |                 3 | 394597613
+         10007 |               393084805 |                 4 | 393084805
+         10015 |               390266432 |                 5 | 390266432
+         10027 |               374037782 |              null | 374037782
+;
+
+maxOfLongByIntDouble
+required_capability: inlinestats
+
+FROM employees
+| KEEP emp_no, avg_worked_seconds, languages, height
+| EVAL height=ROUND(height, 1)
+| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY languages, height
+| WHERE max_avg_worked_seconds == avg_worked_seconds
+| SORT languages, height ASC
+| LIMIT 4;
+
+emp_no:integer | avg_worked_seconds:long | languages:integer | height:double | max_avg_worked_seconds:long
+         10083 |               331236443 |                 1 |           1.4 | 331236443
+         10084 |               359067056 |                 1 |           1.5 | 359067056
+         10033 |               208374744 |                 1 |           1.6 | 208374744
+         10086 |               328580163 |                 1 |           1.7 | 328580163
+;
+
+
+two
+required_capability: inlinestats
+
+FROM employees
+| KEEP emp_no, languages, avg_worked_seconds, gender
+| INLINESTATS avg_avg_worked_seconds = AVG(avg_worked_seconds) BY languages
+| WHERE avg_worked_seconds > avg_avg_worked_seconds
+| INLINESTATS max_languages = MAX(languages) BY gender
+| SORT emp_no ASC
+| LIMIT 3;
+
+emp_no:integer | languages:integer | avg_worked_seconds:long | gender:keyword | avg_avg_worked_seconds:double | max_languages:integer
+         10002 |                 5 |               328922887 | F              | 3.133013149047619E8           | 5
+         10006 |                 3 |               372957040 | F              | 2.978159518235294E8           | 5
+         10007 |                 4 |               393084805 | F              | 2.863684210555556E8           | 5
+;
+
+byMultivaluedSimple
+required_capability: inlinestats
+
+// tag::mv-group[]
+FROM airports
+| INLINESTATS min_scalerank=MIN(scalerank) BY type
+| EVAL type=MV_SORT(type), min_scalerank=MV_SORT(min_scalerank)
+| KEEP abbrev, type, scalerank, min_scalerank
+| WHERE abbrev == "GWL"
+// end::mv-group[]
+;
+
+// tag::mv-group-result[]
+abbrev:keyword |  type:keyword   | scalerank:integer | min_scalerank:integer
+           GWL | [mid, military] | 9                 | [2, 4]
+// end::mv-group-result[]
+;
+
+byMultivaluedMvExpand
+required_capability: inlinestats
+
+// tag::mv-expand[]
+FROM airports
+| KEEP abbrev, type, scalerank
+| MV_EXPAND type
+| INLINESTATS min_scalerank=MIN(scalerank) BY type
+| SORT min_scalerank ASC
+| WHERE abbrev == "GWL"
+// end::mv-expand[]
+;
+
+// tag::mv-expand-result[]
+abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer
+           GWL |          mid | 9                 | 2
+           GWL |     military | 9                 | 4
+// end::mv-expand-result[]
+;
+
+byMvExpand
+required_capability: inlinestats
+
+// tag::extreme-airports[]
+FROM airports
+| MV_EXPAND type
+| EVAL lat = ST_Y(location)
+| INLINESTATS most_northern=MAX(lat), most_southern=MIN(lat) BY type
+| WHERE lat == most_northern OR lat == most_southern
+| SORT lat DESC
+| KEEP type, name, location
+// end::extreme-airports[]
+;
+
+// tag::extreme-airports-result[]
+ type:keyword |           name:text           | location:geo_point
+          mid |             Svalbard Longyear | POINT (15.495229 78.246717)
+        major |                Tromsø Langnes | POINT (18.9072624292132 69.6796790473478)
+     military | Severomorsk-3 (Murmansk N.E.) | POINT (33.2903527616285 69.0168711826804)
+    spaceport |           Baikonur Cosmodrome | POINT (63.307354423875 45.9635739403124)
+        small |                       Dhamial | POINT (73.0320498392002 33.5614146278861)
+        small |                      Sahnewal | POINT (75.9570722403652 30.8503598561702)
+    spaceport |       Centre Spatial Guyanais | POINT (-52.7684296893452 5.23941001258035)
+     military |         Santos Air Force Base | POINT (-46.3052704931003 -23.9237590410637)
+        major |            Christchurch Int'l | POINT (172.538675565223 -43.4885486784104)
+          mid |          Hermes Quijada Int'l | POINT (-67.7530268462675 -53.7814746058316)
+// end::extreme-airports-result[]
+;
+
+brokenwhy-Ignore
+required_capability: inlinestats
+
+FROM airports
+| INLINESTATS min_scalerank=MIN(scalerank) BY type
+| MV_EXPAND type
+| WHERE scalerank == MV_MIN(scalerank);
+
+abbrev:keyword |  type:keyword   | scalerank:integer | min_scalerank:integer
+           GWL | [mid, military] | 9                 | [2, 4]
+;
+
+afterStats
+required_capability: inlinestats
+
+FROM airports
+| STATS count=COUNT(*) BY country
+| INLINESTATS avg=AVG(count)
+| WHERE count > avg * 3
+| SORT count DESC, country ASC
+;
+
+count:long | country:keyword | avg:double
+       129 |   United States | 4.455
+        50 |           India | 4.455
+        45 |          Mexico | 4.455
+        41 |           China | 4.455
+        37 |          Canada | 4.455
+        31 |          Brazil | 4.455
+        26 |          Russia | 4.455
+        19 |            null | 4.455
+        17 |       Australia | 4.455
+        17 |  United Kingdom | 4.455
+;
+
+afterWhere
+required_capability: inlinestats
+
+FROM airports
+| WHERE country != "United States"
+| INLINESTATS count=COUNT(*) BY country
+| SORT count DESC, abbrev ASC
+| KEEP abbrev, country, count
+| LIMIT 4
+;
+
+abbrev:keyword | country:keyword | count:long
+           AGR |           India | 50
+           AMD |           India | 50
+           BBI |           India | 50
+           BDQ |           India | 50
+;
+
+afterLookup
+required_capability: inlinestats
+
+FROM airports
+| RENAME scalerank AS int
+| LOOKUP int_number_names ON int
+| RENAME name as scalerank
+| DROP int
+| INLINESTATS count=COUNT(*) BY scalerank
+| SORT abbrev ASC
+| KEEP abbrev, scalerank
+| LIMIT 4
+;
+
+abbrev:keyword | scalerank:keyword
+           ABJ |              four
+           ABQ |               six
+           ABV |              five
+           ACA |              four
+;
+
+afterEnrich
+required_capability: inlinestats
+required_capability: enrich_load
+
+FROM airports
+| KEEP abbrev, city
+| WHERE abbrev NOT IN ("ADJ", "ATQ") // Skip airports in regions with right-to-left text which the test file isn't good with
+| ENRICH city_names ON city WITH region
+| WHERE MV_COUNT(region) == 1
+| INLINESTATS COUNT(*) BY region
+| SORT abbrev ASC
+| WHERE `COUNT(*)` > 1
+| LIMIT 3
+;
+
+abbrev:keyword | city:keyword |       region:text | "COUNT(*)":long 
+           ALA |       Almaty |     Жетісу ауданы | 2
+           BXJ |       Almaty |     Жетісу ауданы | 2
+           FUK |      Fukuoka |             中央区 | 2
+;
+
+beforeStats
+required_capability: inlinestats
+
+FROM airports
+| EVAL lat = ST_Y(location)
+| INLINESTATS avg_lat=AVG(lat)
+| STATS northern=COUNT(lat > avg_lat OR NULL), southern=COUNT(lat < avg_lat OR NULL)
+;
+
+northern:long | southern:long
+          520 | 371
+;
+
+beforeKeepSort
+required_capability: inlinestats
+
+FROM employees
+| INLINESTATS max_salary = MAX(salary) by languages
+| KEEP emp_no, languages, max_salary
+| SORT emp_no ASC
+| LIMIT 3;
+
+emp_no:integer | languages:integer | max_salary:integer
+         10001 |                 2 | 73578
+         10002 |                 5 | 66817
+         10003 |                 4 | 74572
+;
+
+beforeKeep
+required_capability: inlinestats
+
+FROM employees
+| INLINESTATS max_salary = MAX(salary) by languages
+| KEEP emp_no, languages, max_salary
+| LIMIT 3;
+
+ignoreOrder:true
+emp_no:integer | languages:integer | max_salary:integer
+         10001 |                 2 | 73578
+         10002 |                 5 | 66817
+         10003 |                 4 | 74572
+;
+
+beforeEnrich
+required_capability: inlinestats
+required_capability: enrich_load
+
+FROM airports
+| KEEP abbrev, type, city
+| INLINESTATS COUNT(*) BY type
+| ENRICH city_names ON city WITH region
+| WHERE MV_COUNT(region) == 1
+| SORT abbrev ASC
+| LIMIT 3
+;
+
+abbrev:keyword | type:keyword |    city:keyword    | "COUNT(*)":long | region:text
+           ABJ |          mid |            Abidjan |             499 | Abidjan
+           ABV |        major |              Abuja |             385 | Municipal Area Council
+           ACA |        major | Acapulco de Juárez |             385 | Acapulco de Juárez
+;
+
+beforeAndAfterEnrich
+required_capability: inlinestats
+required_capability: enrich_load
+
+FROM airports
+| KEEP abbrev, type, city
+| INLINESTATS COUNT(*) BY type
+| ENRICH city_names ON city WITH region
+| WHERE MV_COUNT(region) == 1
+| INLINESTATS count_region=COUNT(*) BY region 
+| SORT abbrev ASC
+| LIMIT 3
+;
+
+abbrev:keyword | type:keyword |    city:keyword    | "COUNT(*)":long | region:text            | count_region:long
+           ABJ |          mid |            Abidjan |             499 | Abidjan                | 1
+           ABV |        major |              Abuja |             385 | Municipal Area Council | 1
+           ACA |        major | Acapulco de Juárez |             385 | Acapulco de Juárez     | 1
+;
+
+
+shadowing
+required_capability: inlinestats
+
+ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
+| INLINESTATS env=VALUES(right) BY client_ip
+;
+
+left:keyword | client_ip:keyword | right:keyword | env:keyword
+left         | 172.21.0.5        | right         | right
+;
+
+shadowingMulti
+required_capability: inlinestats
+
+ROW left = "left", airport = "Zurich Airport ZRH", city = "Zürich", middle = "middle", region = "North-East Switzerland", right = "right"
+| INLINESTATS airport=VALUES(left), region=VALUES(left), city_boundary=VALUES(left) BY city
+;
+
+left:keyword | city:keyword | middle:keyword | right:keyword | airport:keyword | region:keyword | city_boundary:keyword
+left         | Zürich       | middle         | right         |            left |           left | left
+;
+
+shadowingSelf
+required_capability: inlinestats
+
+ROW city="Raleigh"
+| INLINESTATS city=COUNT(city)
+;
+
+city:long
+1
+;
+
+shadowingSelfBySelf-Ignore
+required_capability: inlinestats
+
+ROW city="Raleigh"
+| INLINESTATS city=COUNT(city) BY city
+;
+
+city:long
+1
+;
+
+shadowingInternal-Ignore
+required_capability: inlinestats
+
+ROW city = "Zürich"
+| INLINESTATS x=VALUES(city), x=VALUES(city)
+;
+
+city:keyword | x:keyword
+Zürich       | Zürich
+;
+
+byConstant-Ignore
+required_capability: inlinestats
+
+FROM employees
+| KEEP emp_no, languages
+| INLINESTATS max_lang = MAX(languages) BY y=1 
+| WHERE max_lang == languages
+| SORT emp_no ASC
+| LIMIT 5
+;
+
+emp_no:integer | languages:integer | max_lang:integer | y:integer
+         10002 |                 5 |                5 | 1
+         10004 |                 5 |                5 | 1
+         10011 |                 5 |                5 | 1
+         10012 |                 5 |                5 | 1
+         10014 |                 5 |                5 | 1
+;
+
+aggConstant
+required_capability: inlinestats
+
+FROM employees
+| KEEP emp_no
+| INLINESTATS one = MAX(1) BY emp_no
+| SORT emp_no ASC
+| LIMIT 5
+;
+
+emp_no:integer | one:integer
+         10001 | 1
+         10002 | 1
+         10003 | 1
+         10004 | 1
+         10005 | 1
+;
+
+percentile
+required_capability: inlinestats
+
+FROM employees
+| KEEP emp_no, salary
+| INLINESTATS ninety_fifth_salary = PERCENTILE(salary, 95)
+| WHERE salary > ninety_fifth_salary
+| SORT emp_no ASC
+| LIMIT 5
+;
+
+emp_no:integer | salary:integer | ninety_fifth_salary:double
+         10007 |          74572 | 73584.95
+         10019 |          73717 | 73584.95
+         10027 |          73851 | 73584.95
+         10029 |          74999 | 73584.95
+         10045 |          74970 | 73584.95
+;

+ 61 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec

@@ -335,6 +335,27 @@ count:long  |  client_ip:ip
 2           |  172.21.2.162
 ;
 
+statsUnionAggInline
+required_capability: union_types
+required_capability: inlinestats
+
+FROM sample_data, sample_data_str
+| STATS
+    count = COUNT(CIDR_MATCH(TO_IP(client_ip), "172.21.0.0/24") OR NULL)
+  BY
+    @timestamp = DATE_TRUNC(10 minutes, @timestamp)
+| SORT count DESC, @timestamp ASC
+| LIMIT 4
+;
+
+count:long | @timestamp:date
+         2 | 2023-10-23T13:30:00.000Z
+         0 | 2023-10-23T12:10:00.000Z
+         0 | 2023-10-23T12:20:00.000Z
+         0 | 2023-10-23T13:50:00.000Z
+;
+
+
 multiIndexIpStringStatsInline2
 required_capability: union_types
 required_capability: union_types_agg_cast
@@ -954,3 +975,43 @@ event_duration:long | _index:keyword      | ts:date                   | ts_str:k
 8268153             | sample_data_str     | 2023-10-23T13:52:55.015Z  | 2023-10-23T13:52:55.015Z  | 1698069175015  |  172.21.3.15   |  172.21.3.15
 8268153             | sample_data_ts_long | 2023-10-23T13:52:55.015Z  | 1698069175015             | 1698069175015  |  172.21.3.15   |  172.21.3.15
 ;
+
+
+inlineStatsUnionGroup
+required_capability: union_types
+required_capability: inlinestats
+
+FROM sample_data, sample_data_ts_long
+| EVAL @timestamp = SUBSTRING(TO_STRING(@timestamp), 0, 7)
+| INLINESTATS count = COUNT(*) BY @timestamp
+| SORT client_ip ASC, @timestamp ASC
+| LIMIT 4
+;
+
+client_ip:ip | event_duration:long |    message:keyword    | @timestamp:keyword | count:long
+  172.21.0.5 |             1232382 | Disconnected          |            1698068 | 1
+  172.21.0.5 |             1232382 | Disconnected          |            2023-10 | 7
+172.21.2.113 |             2764889 | Connected to 10.1.0.2 |            1698064 | 1
+172.21.2.113 |             2764889 | Connected to 10.1.0.2 |            2023-10 | 7
+
+;
+
+inlineStatsUnionGroupTogether
+required_capability: union_types
+required_capability: inlinestats
+
+FROM sample_data, sample_data_ts_long
+| EVAL @timestamp = TO_STRING(TO_DATETIME(@timestamp))
+| INLINESTATS count = COUNT(*) BY @timestamp
+| SORT client_ip ASC, @timestamp ASC
+| LIMIT 4
+;
+
+client_ip:ip | event_duration:long |    message:keyword    |    @timestamp:keyword    | count:long
+  172.21.0.5 |             1232382 | Disconnected          | 2023-10-23T13:33:34.937Z | 2
+  172.21.0.5 |             1232382 | Disconnected          | 2023-10-23T13:33:34.937Z | 2
+172.21.2.113 |             2764889 | Connected to 10.1.0.2 | 2023-10-23T12:27:28.948Z | 2
+172.21.2.113 |             2764889 | Connected to 10.1.0.2 | 2023-10-23T12:27:28.948Z | 2
+;
+
+# Once INLINESTATS supports expressions in agg functions and groups, convert the group in the inlinestats

+ 5 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -44,6 +44,11 @@ public class EsqlCapabilities {
          */
         FN_SUBSTRING_EMPTY_NULL,
 
+        /**
+         * Support for the {@code INLINESTATS} syntax.
+         */
+        INLINESTATS(true),
+
         /**
          * Support for aggregation function {@code TOP}.
          */

+ 11 - 9
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

@@ -75,6 +75,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Lookup;
 import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
 import org.elasticsearch.xpack.esql.plan.logical.Project;
 import org.elasticsearch.xpack.esql.plan.logical.Rename;
+import org.elasticsearch.xpack.esql.plan.logical.Stats;
 import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
 import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
@@ -399,8 +400,8 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
                 childrenOutput.addAll(output);
             }
 
-            if (plan instanceof Aggregate agg) {
-                return resolveAggregate(agg, childrenOutput);
+            if (plan instanceof Stats stats) {
+                return resolveStats(stats, childrenOutput);
             }
 
             if (plan instanceof Drop d) {
@@ -434,11 +435,11 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
             return plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
         }
 
-        private LogicalPlan resolveAggregate(Aggregate a, List<Attribute> childrenOutput) {
+        private LogicalPlan resolveStats(Stats stats, List<Attribute> childrenOutput) {
             // if the grouping is resolved but the aggs are not, use the former to resolve the latter
             // e.g. STATS a ... GROUP BY a = x + 1
             Holder<Boolean> changed = new Holder<>(false);
-            List<Expression> groupings = a.groupings();
+            List<Expression> groupings = stats.groupings();
             // first resolve groupings since the aggs might refer to them
             // trying to globally resolve unresolved attributes will lead to some being marked as unresolvable
             if (Resolvables.resolved(groupings) == false) {
@@ -452,12 +453,12 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
                 }
                 groupings = newGroupings;
                 if (changed.get()) {
-                    a = new Aggregate(a.source(), a.child(), a.aggregateType(), newGroupings, a.aggregates());
+                    stats = stats.with(newGroupings, stats.aggregates());
                     changed.set(false);
                 }
             }
 
-            if (a.expressionsResolved() == false) {
+            if (stats.expressionsResolved() == false) {
                 AttributeMap<Expression> resolved = new AttributeMap<>();
                 for (Expression e : groupings) {
                     Attribute attr = Expressions.attribute(e);
@@ -468,7 +469,7 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
                 List<Attribute> resolvedList = NamedExpressions.mergeOutputAttributes(new ArrayList<>(resolved.keySet()), childrenOutput);
                 List<NamedExpression> newAggregates = new ArrayList<>();
 
-                for (NamedExpression aggregate : a.aggregates()) {
+                for (NamedExpression aggregate : stats.aggregates()) {
                     var agg = (NamedExpression) aggregate.transformUp(UnresolvedAttribute.class, ua -> {
                         Expression ne = ua;
                         Attribute maybeResolved = maybeResolveAttribute(ua, resolvedList);
@@ -481,10 +482,10 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
                     newAggregates.add(agg);
                 }
 
-                a = changed.get() ? new Aggregate(a.source(), a.child(), a.aggregateType(), groupings, newAggregates) : a;
+                stats = changed.get() ? stats.with(groupings, newAggregates) : stats;
             }
 
-            return a;
+            return (LogicalPlan) stats;
         }
 
         private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput) {
@@ -1122,6 +1123,7 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
             // In ResolveRefs the aggregates are resolved from the groupings, which might have an unresolved MultiTypeEsField.
             // Now that we have resolved those, we need to re-resolve the aggregates.
             if (plan instanceof Aggregate agg) {
+                // TODO once inlinestats supports expressions in groups we'll likely need the same sort of extraction here
                 // If the union-types resolution occurred in a child of the aggregate, we need to check the groupings
                 plan = agg.transformExpressionsOnly(FieldAttribute.class, UnionTypesCleanup::checkUnresolved);
 

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java

@@ -36,6 +36,7 @@ import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
 import org.elasticsearch.xpack.esql.plan.logical.Eval;
 import org.elasticsearch.xpack.esql.plan.logical.Filter;
 import org.elasticsearch.xpack.esql.plan.logical.Grok;
+import org.elasticsearch.xpack.esql.plan.logical.InlineStats;
 import org.elasticsearch.xpack.esql.plan.logical.Limit;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.plan.logical.Lookup;
@@ -147,6 +148,7 @@ public final class PlanNamedTypes {
             of(LogicalPlan.class, EsqlProject.class, PlanNamedTypes::writeEsqlProject, PlanNamedTypes::readEsqlProject),
             of(LogicalPlan.class, Filter.class, PlanNamedTypes::writeFilter, PlanNamedTypes::readFilter),
             of(LogicalPlan.class, Grok.class, PlanNamedTypes::writeGrok, PlanNamedTypes::readGrok),
+            of(LogicalPlan.class, InlineStats.class, (PlanStreamOutput out, InlineStats v) -> v.writeTo(out), InlineStats::new),
             of(LogicalPlan.class, Join.class, (out, p) -> p.writeTo(out), Join::new),
             of(LogicalPlan.class, Limit.class, PlanNamedTypes::writeLimit, PlanNamedTypes::readLimit),
             of(LogicalPlan.class, LocalRelation.class, (out, p) -> p.writeTo(out), LocalRelation::new),

+ 3 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

@@ -122,7 +122,9 @@ public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor<PhysicalPla
                     if (p instanceof HashJoinExec join) {
                         attributes.removeAll(join.addedFields());
                         for (Attribute rhs : join.rightFields()) {
-                            attributes.remove(rhs);
+                            if (join.leftFields().stream().anyMatch(x -> x.semanticEquals(rhs)) == false) {
+                                attributes.remove(rhs);
+                            }
                         }
                     }
                     if (p instanceof EnrichExec ee) {

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/package-info.java

@@ -121,6 +121,8 @@
  *         function implementations.</li>
  *     <li>{@link org.elasticsearch.xpack.esql.action.RestEsqlQueryAction Sync} and
  *         {@link org.elasticsearch.xpack.esql.action.RestEsqlAsyncQueryAction async} HTTP API entry points</li>
+ *     <li>{@link org.elasticsearch.xpack.esql.plan.logical.Phased} - Marks a {@link org.elasticsearch.xpack.esql.plan.logical.LogicalPlan}
+ *         node as requiring multiple ESQL executions to run. </li>
  * </ul>
  *
  * <h3>Query Planner</h3>

+ 4 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

@@ -56,6 +56,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Row;
 import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
 import org.elasticsearch.xpack.esql.plan.logical.meta.MetaFunctions;
 import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
+import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -331,6 +332,9 @@ public class LogicalPlanBuilder extends ExpressionBuilder {
 
     @Override
     public PlanFactory visitInlinestatsCommand(EsqlBaseParser.InlinestatsCommandContext ctx) {
+        if (false == EsqlPlugin.INLINESTATS_FEATURE_FLAG.isEnabled()) {
+            throw new ParsingException(source(ctx), "INLINESTATS command currently requires a snapshot build");
+        }
         List<NamedExpression> aggregates = new ArrayList<>(visitFields(ctx.stats));
         List<NamedExpression> groupings = visitGrouping(ctx.grouping);
         aggregates.addAll(groupings);

+ 7 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java

@@ -26,9 +26,7 @@ import java.util.Objects;
 import static java.util.Collections.emptyList;
 import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
 
-public class Aggregate extends UnaryPlan {
-    private List<Attribute> lazyOutput;
-
+public class Aggregate extends UnaryPlan implements Stats {
     public enum AggregateType {
         STANDARD,
         // include metrics aggregates such as rates
@@ -54,6 +52,7 @@ public class Aggregate extends UnaryPlan {
     private final AggregateType aggregateType;
     private final List<Expression> groupings;
     private final List<? extends NamedExpression> aggregates;
+    private List<Attribute> lazyOutput;
 
     public Aggregate(
         Source source,
@@ -96,6 +95,11 @@ public class Aggregate extends UnaryPlan {
         return new Aggregate(source(), newChild, aggregateType, groupings, aggregates);
     }
 
+    @Override
+    public Aggregate with(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
+        return new Aggregate(source(), child(), aggregateType(), newGroupings, newAggregates);
+    }
+
     public AggregateType aggregateType() {
         return aggregateType;
     }

+ 179 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java

@@ -7,21 +7,58 @@
 
 package org.elasticsearch.xpack.esql.plan.logical;
 
+import org.elasticsearch.common.io.stream.NamedWriteable;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockUtils;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
+import org.elasticsearch.xpack.esql.core.expression.Alias;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
-import org.elasticsearch.xpack.esql.core.expression.Expressions;
+import org.elasticsearch.xpack.esql.core.expression.Literal;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
+import org.elasticsearch.xpack.esql.plan.logical.join.Join;
+import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
+import org.elasticsearch.xpack.esql.plan.logical.join.JoinType;
+import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
+import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
+import org.elasticsearch.xpack.esql.planner.PlannerUtils;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public class InlineStats extends UnaryPlan {
+import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
+
+/**
+ * Enriches the stream of data with the results of running a {@link Aggregate STATS}.
+ * <p>
+ *     This is a {@link Phased} operation that doesn't have a "native" implementation.
+ *     Instead, it's implemented as first running a {@link Aggregate STATS} and then
+ *     a {@link Join}.
+ * </p>
+ */
+public class InlineStats extends UnaryPlan implements NamedWriteable, Phased, Stats {
+    public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+        InlineStats.class,
+        "InlineStats",
+        InlineStats::new
+    );
 
     private final List<Expression> groupings;
     private final List<? extends NamedExpression> aggregates;
+    private List<Attribute> lazyOutput;
 
     public InlineStats(Source source, LogicalPlan child, List<Expression> groupings, List<? extends NamedExpression> aggregates) {
         super(source, child);
@@ -29,6 +66,28 @@ public class InlineStats extends UnaryPlan {
         this.aggregates = aggregates;
     }
 
+    public InlineStats(StreamInput in) throws IOException {
+        this(
+            Source.readFrom((PlanStreamInput) in),
+            ((PlanStreamInput) in).readLogicalPlanNode(),
+            in.readNamedWriteableCollectionAsList(Expression.class),
+            in.readNamedWriteableCollectionAsList(NamedExpression.class)
+        );
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        source().writeTo(out);
+        ((PlanStreamOutput) out).writeLogicalPlanNode(child());
+        out.writeNamedWriteableCollection(groupings);
+        out.writeNamedWriteableCollection(aggregates);
+    }
+
+    @Override
+    public String getWriteableName() {
+        return ENTRY.name;
+    }
+
     @Override
     protected NodeInfo<InlineStats> info() {
         return NodeInfo.create(this, InlineStats::new, child(), groupings, aggregates);
@@ -39,10 +98,17 @@ public class InlineStats extends UnaryPlan {
         return new InlineStats(source(), newChild, groupings, aggregates);
     }
 
+    @Override
+    public InlineStats with(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
+        return new InlineStats(source(), child(), newGroupings, newAggregates);
+    }
+
+    @Override
     public List<Expression> groupings() {
         return groupings;
     }
 
+    @Override
     public List<? extends NamedExpression> aggregates() {
         return aggregates;
     }
@@ -54,7 +120,19 @@ public class InlineStats extends UnaryPlan {
 
     @Override
     public List<Attribute> output() {
-        return Expressions.asAttributes(aggregates);
+        if (this.lazyOutput == null) {
+            List<NamedExpression> addedFields = new ArrayList<>();
+            AttributeSet childOutput = child().outputSet();
+
+            for (NamedExpression agg : aggregates) {
+                if (childOutput.contains(agg) == false) {
+                    addedFields.add(agg);
+                }
+            }
+
+            this.lazyOutput = mergeOutputAttributes(addedFields, child().output());
+        }
+        return lazyOutput;
     }
 
     @Override
@@ -77,4 +155,102 @@ public class InlineStats extends UnaryPlan {
             && Objects.equals(aggregates, other.aggregates)
             && Objects.equals(child(), other.child());
     }
+
+    @Override
+    public LogicalPlan firstPhase() {
+        return new Aggregate(source(), child(), Aggregate.AggregateType.STANDARD, groupings, aggregates);
+    }
+
+    @Override
+    public LogicalPlan nextPhase(List<Attribute> schema, List<Page> firstPhaseResult) {
+        if (equalsAndSemanticEquals(firstPhase().output(), schema) == false) {
+            throw new IllegalStateException("Unexpected first phase outputs: " + firstPhase().output() + " vs " + schema);
+        }
+        if (groupings.isEmpty()) {
+            return ungroupedNextPhase(schema, firstPhaseResult);
+        }
+        return groupedNextPhase(schema, firstPhaseResult);
+    }
+
+    private LogicalPlan ungroupedNextPhase(List<Attribute> schema, List<Page> firstPhaseResult) {
+        if (firstPhaseResult.size() != 1) {
+            throw new IllegalArgumentException("expected single row");
+        }
+        Page p = firstPhaseResult.get(0);
+        if (p.getPositionCount() != 1) {
+            throw new IllegalArgumentException("expected single row");
+        }
+        List<Alias> values = new ArrayList<>(schema.size());
+        for (int i = 0; i < schema.size(); i++) {
+            Attribute s = schema.get(i);
+            Object value = BlockUtils.toJavaObject(p.getBlock(i), 0);
+            values.add(new Alias(source(), s.name(), null, new Literal(source(), value, s.dataType()), aggregates.get(i).id()));
+        }
+        return new Eval(source(), child(), values);
+    }
+
+    private static boolean equalsAndSemanticEquals(List<Attribute> left, List<Attribute> right) {
+        if (left.equals(right) == false) {
+            return false;
+        }
+        for (int i = 0; i < left.size(); i++) {
+            if (left.get(i).semanticEquals(right.get(i)) == false) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private LogicalPlan groupedNextPhase(List<Attribute> schema, List<Page> firstPhaseResult) {
+        LocalRelation local = firstPhaseResultsToLocalRelation(schema, firstPhaseResult);
+        List<Attribute> groupingAttributes = new ArrayList<>(groupings.size());
+        for (Expression g : groupings) {
+            if (g instanceof Attribute a) {
+                groupingAttributes.add(a);
+            } else {
+                throw new UnsupportedOperationException("INLINESTATS doesn't support expressions in grouping position yet");
+            }
+        }
+        List<Attribute> leftFields = new ArrayList<>(groupingAttributes.size());
+        List<Attribute> rightFields = new ArrayList<>(groupingAttributes.size());
+        List<Attribute> rhsOutput = Join.makeReference(local.output());
+        for (Attribute lhs : groupingAttributes) {
+            for (Attribute rhs : rhsOutput) {
+                if (lhs.name().equals(rhs.name())) {
+                    leftFields.add(lhs);
+                    rightFields.add(rhs);
+                    break;
+                }
+            }
+        }
+        JoinConfig config = new JoinConfig(JoinType.LEFT, groupingAttributes, leftFields, rightFields);
+        return new Join(source(), child(), local, config);
+    }
+
+    private LocalRelation firstPhaseResultsToLocalRelation(List<Attribute> schema, List<Page> firstPhaseResult) {
+        // Limit ourselves to 1mb of results similar to LOOKUP for now.
+        long bytesUsed = firstPhaseResult.stream().mapToLong(Page::ramBytesUsedByBlocks).sum();
+        if (bytesUsed > ByteSizeValue.ofMb(1).getBytes()) {
+            throw new IllegalArgumentException("first phase result too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb");
+        }
+        int positionCount = firstPhaseResult.stream().mapToInt(Page::getPositionCount).sum();
+        Block.Builder[] builders = new Block.Builder[schema.size()];
+        Block[] blocks;
+        try {
+            for (int b = 0; b < builders.length; b++) {
+                builders[b] = PlannerUtils.toElementType(schema.get(b).dataType())
+                    .newBlockBuilder(positionCount, PlannerUtils.NON_BREAKING_BLOCK_FACTORY);
+            }
+            for (Page p : firstPhaseResult) {
+                for (int b = 0; b < builders.length; b++) {
+                    builders[b].copyFrom(p.getBlock(b), 0, p.getPositionCount());
+                }
+            }
+            blocks = Block.Builder.buildAll(builders);
+        } finally {
+            Releasables.closeExpectNoException(builders);
+        }
+        return new LocalRelation(source(), schema, LocalSupplier.of(blocks));
+    }
+
 }

+ 135 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java

@@ -0,0 +1,135 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plan.logical;
+
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.xpack.esql.analysis.Analyzer;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.util.Holder;
+
+import java.util.List;
+
+/**
+ * Marks a {@link LogicalPlan} node as requiring multiple ESQL executions to run.
+ * All logical plans are now run by:
+ * <ol>
+ *     <li>{@link Analyzer analyzing} the entire query</li>
+ *     <li>{@link Phased#extractFirstPhase extracting} the first phase from the
+ *         logical plan</li>
+ *     <li>if there isn't a first phase, run the entire logical plan and return the
+ *         results. you are done.</li>
+ *     <li>if there is first phase, run that</li>
+ *     <li>{@link Phased#applyResultsFromFirstPhase applying} the results from the
+ *         first phase into the logical plan</li>
+ *     <li>start over from step 2 using the new logical plan</li>
+ * </ol>
+ * <p>For example, {@code INLINESTATS} is written like this:</p>
+ * <pre>{@code
+ * FROM foo
+ * | EVAL bar = a * b
+ * | INLINESTATS m = MAX(bar) BY b
+ * | WHERE m = bar
+ * | LIMIT 1
+ * }</pre>
+ * <p>And it's split into:</p>
+ * <pre>{@code
+ * FROM foo
+ * | EVAL bar = a * b
+ * | STATS m = MAX(bar) BY b
+ * }</pre>
+ * <p>and</p>
+ * <pre>{@code
+ * FROM foo
+ * | EVAL bar = a * b
+ * | LOOKUP (results of m = MAX(bar) BY b) ON b
+ * | WHERE m = bar
+ * | LIMIT 1
+ * }</pre>
+ * <p>If there are multiple {@linkplain Phased} nodes in the plan we always
+ * operate on the lowest one first, counting from the data source "upwards".
+ * Generally that'll read left to right in the query. So:</p>
+ * <pre>{@code
+ * FROM foo | INLINESTATS | INLINESTATS
+ * }</pre>
+ * becomes
+ * <pre>{@code
+ * FROM foo | STATS
+ * }</pre>
+ * and
+ * <pre>{@code
+ * FROM foo | HASHJOIN | INLINESTATS
+ * }</pre>
+ * which is further broken into
+ * <pre>{@code
+ * FROM foo | HASHJOIN | STATS
+ * }</pre>
+ * and finally:
+ * <pre>{@code
+ * FROM foo | HASHJOIN | HASHJOIN
+ * }</pre>
+ */
+public interface Phased {
+    /**
+     * Return a {@link LogicalPlan} for the first "phase" of this operation.
+     * The result of this phase will be provided to {@link #nextPhase}.
+     */
+    LogicalPlan firstPhase();
+
+    /**
+     * Use the results of plan provided from {@link #firstPhase} to produce the
+     * next phase of the query.
+     */
+    LogicalPlan nextPhase(List<Attribute> schema, List<Page> firstPhaseResult);
+
+    /**
+     * Find the first {@link Phased} operation and return it's {@link #firstPhase}.
+     * Or {@code null} if there aren't any {@linkplain Phased} operations.
+     */
+    static LogicalPlan extractFirstPhase(LogicalPlan plan) {
+        if (false == plan.analyzed()) {
+            throw new IllegalArgumentException("plan must be analyzed");
+        }
+        var firstPhase = new Holder<LogicalPlan>();
+        plan.forEachUp(t -> {
+            if (firstPhase.get() == null && t instanceof Phased phased) {
+                firstPhase.set(phased.firstPhase());
+            }
+        });
+        LogicalPlan firstPhasePlan = firstPhase.get();
+        if (firstPhasePlan != null) {
+            firstPhasePlan.setAnalyzed();
+        }
+        return firstPhasePlan;
+    }
+
+    /**
+     * Merge the results of {@link #extractFirstPhase} into a {@link LogicalPlan}
+     * and produce a new {@linkplain LogicalPlan} that will execute the rest of the
+     * query. This plan <strong>may</strong> contain <strong>another</strong>
+     * {@link #firstPhase}. If it does then it will also need to be
+     * {@link #extractFirstPhase extracted} and the results will need to be applied
+     * again by calling this method. Eventually this will produce a plan which
+     * does not have a {@link #firstPhase} and <strong>that</strong> is the "final"
+     * phase of the plan.
+     */
+    static LogicalPlan applyResultsFromFirstPhase(LogicalPlan plan, List<Attribute> schema, List<Page> result) {
+        if (false == plan.analyzed()) {
+            throw new IllegalArgumentException("plan must be analyzed");
+        }
+        Holder<Boolean> seen = new Holder<>(false);
+        LogicalPlan applied = plan.transformUp(logicalPlan -> {
+            if (seen.get() == false && logicalPlan instanceof Phased phased) {
+                seen.set(true);
+                return phased.nextPhase(schema, result);
+            }
+            return logicalPlan;
+        });
+        applied.setAnalyzed();
+        return applied;
+    }
+}

+ 39 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java

@@ -0,0 +1,39 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plan.logical;
+
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
+
+import java.util.List;
+
+/**
+ * STATS-like operations. Like {@link Aggregate} and {@link InlineStats}.
+ */
+public interface Stats {
+    /**
+     * Rebuild this plan with new groupings and new aggregates.
+     */
+    Stats with(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates);
+
+    /**
+     * Have all the expressions in this plan been resolved?
+     */
+    boolean expressionsResolved();
+
+    /**
+     * List containing both the aggregate expressions and grouping expressions.
+     */
+    List<? extends NamedExpression> aggregates();
+
+    /**
+     * List containing just the grouping expressions.
+     */
+    List<Expression> groupings();
+
+}

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.FeatureFlag;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
@@ -79,6 +80,7 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 public class EsqlPlugin extends Plugin implements ActionPlugin {
+    public static final FeatureFlag INLINESTATS_FEATURE_FLAG = new FeatureFlag("esql_inlinestats");
 
     public static final String ESQL_WORKER_THREAD_POOL_NAME = "esql_worker";
 

+ 43 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -10,7 +10,10 @@ package org.elasticsearch.xpack.esql.session;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.fieldcaps.FieldCapabilities;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.regex.Regex;
+import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
@@ -46,6 +49,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;
 import org.elasticsearch.xpack.esql.plan.logical.Keep;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.Phased;
 import org.elasticsearch.xpack.esql.plan.logical.Project;
 import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
 import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
@@ -125,14 +129,51 @@ public class EsqlSession {
         );
     }
 
+    /**
+     * Execute an analyzed plan. Most code should prefer calling {@link #execute} but
+     * this is public for testing. See {@link Phased} for the sequence of operations.
+     */
     public void executeAnalyzedPlan(
         EsqlQueryRequest request,
         BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
         LogicalPlan analyzedPlan,
         ActionListener<Result> listener
     ) {
-        // TODO phased execution lands here.
-        runPhase.accept(logicalPlanToPhysicalPlan(analyzedPlan, request), listener);
+        LogicalPlan firstPhase = Phased.extractFirstPhase(analyzedPlan);
+        if (firstPhase == null) {
+            runPhase.accept(logicalPlanToPhysicalPlan(analyzedPlan, request), listener);
+        } else {
+            executePhased(new ArrayList<>(), analyzedPlan, request, firstPhase, runPhase, listener);
+        }
+    }
+
+    private void executePhased(
+        List<DriverProfile> profileAccumulator,
+        LogicalPlan mainPlan,
+        EsqlQueryRequest request,
+        LogicalPlan firstPhase,
+        BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
+        ActionListener<Result> listener
+    ) {
+        PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(firstPhase, request);
+        runPhase.accept(physicalPlan, listener.delegateFailureAndWrap((next, result) -> {
+            try {
+                profileAccumulator.addAll(result.profiles());
+                LogicalPlan newMainPlan = Phased.applyResultsFromFirstPhase(mainPlan, physicalPlan.output(), result.pages());
+                LogicalPlan newFirstPhase = Phased.extractFirstPhase(newMainPlan);
+                if (newFirstPhase == null) {
+                    PhysicalPlan finalPhysicalPlan = logicalPlanToPhysicalPlan(newMainPlan, request);
+                    runPhase.accept(finalPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
+                        profileAccumulator.addAll(finalResult.profiles());
+                        finalListener.onResponse(new Result(finalResult.schema(), finalResult.pages(), profileAccumulator));
+                    }));
+                } else {
+                    executePhased(profileAccumulator, newMainPlan, request, newFirstPhase, runPhase, next);
+                }
+            } finally {
+                Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks)));
+            }
+        }));
     }
 
     private LogicalPlan parse(String query, QueryParams params) {

+ 2 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java

@@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.ExistsQueryBuilder;
 import org.elasticsearch.index.query.MatchAllQueryBuilder;
@@ -126,6 +127,7 @@ public class SerializationTestUtils {
         entries.addAll(Expression.getNamedWriteables());
         entries.addAll(EsqlScalarFunction.getNamedWriteables());
         entries.addAll(AggregateFunction.getNamedWriteables());
+        entries.addAll(Block.getNamedWriteables());
         return new NamedWriteableRegistry(entries);
     }
 }

+ 2 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java

@@ -48,6 +48,7 @@ import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
 import org.elasticsearch.xpack.esql.plan.logical.Eval;
 import org.elasticsearch.xpack.esql.plan.logical.Filter;
 import org.elasticsearch.xpack.esql.plan.logical.Grok;
+import org.elasticsearch.xpack.esql.plan.logical.InlineStats;
 import org.elasticsearch.xpack.esql.plan.logical.Limit;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.plan.logical.Lookup;
@@ -146,6 +147,7 @@ public class PlanNamedTypesTests extends ESTestCase {
         Eval.class,
         Filter.class,
         Grok.class,
+        InlineStats.class,
         Join.class,
         Limit.class,
         LocalRelation.class,

+ 2 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java

@@ -311,6 +311,7 @@ public class StatementParserTests extends AbstractStatementParserTests {
     }
 
     public void testInlineStatsWithGroups() {
+        assumeTrue("INLINESTATS requires snapshot builds", Build.current().isSnapshot());
         assertEquals(
             new InlineStats(
                 EMPTY,
@@ -327,6 +328,7 @@ public class StatementParserTests extends AbstractStatementParserTests {
     }
 
     public void testInlineStatsWithoutGroups() {
+        assumeTrue("INLINESTATS requires snapshot builds", Build.current().isSnapshot());
         assertEquals(
             new InlineStats(
                 EMPTY,

+ 146 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java

@@ -0,0 +1,146 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plan.logical;
+
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.index.IndexMode;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.esql.core.expression.Alias;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.expression.Literal;
+import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
+import org.elasticsearch.xpack.esql.core.index.EsIndex;
+import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+
+public class PhasedTests extends ESTestCase {
+    public void testZeroLayers() {
+        EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false);
+        relation.setAnalyzed();
+        assertThat(Phased.extractFirstPhase(relation), nullValue());
+    }
+
+    public void testOneLayer() {
+        EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false);
+        LogicalPlan orig = new Dummy(Source.synthetic("orig"), relation);
+        orig.setAnalyzed();
+        assertThat(Phased.extractFirstPhase(orig), sameInstance(relation));
+        LogicalPlan finalPhase = Phased.applyResultsFromFirstPhase(
+            orig,
+            List.of(new ReferenceAttribute(Source.EMPTY, "foo", DataType.KEYWORD)),
+            List.of()
+        );
+        assertThat(
+            finalPhase,
+            equalTo(new Row(orig.source(), List.of(new Alias(orig.source(), "foo", new Literal(orig.source(), "foo", DataType.KEYWORD)))))
+        );
+        assertThat(Phased.extractFirstPhase(finalPhase), nullValue());
+    }
+
+    public void testTwoLayer() {
+        EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false);
+        LogicalPlan inner = new Dummy(Source.synthetic("inner"), relation);
+        LogicalPlan orig = new Dummy(Source.synthetic("outer"), inner);
+        orig.setAnalyzed();
+        assertThat(
+            "extractFirstPhase should call #firstPhase on the earliest child in the plan",
+            Phased.extractFirstPhase(orig),
+            sameInstance(relation)
+        );
+        LogicalPlan secondPhase = Phased.applyResultsFromFirstPhase(
+            orig,
+            List.of(new ReferenceAttribute(Source.EMPTY, "foo", DataType.KEYWORD)),
+            List.of()
+        );
+        assertThat(
+            "applyResultsFromFirstPhase should call #nextPhase one th earliest child in the plan",
+            secondPhase,
+            equalTo(
+                new Dummy(
+                    Source.synthetic("outer"),
+                    new Row(orig.source(), List.of(new Alias(orig.source(), "foo", new Literal(orig.source(), "foo", DataType.KEYWORD))))
+                )
+            )
+        );
+
+        assertThat(Phased.extractFirstPhase(secondPhase), sameInstance(secondPhase.children().get(0)));
+        LogicalPlan finalPhase = Phased.applyResultsFromFirstPhase(
+            secondPhase,
+            List.of(new ReferenceAttribute(Source.EMPTY, "foo", DataType.KEYWORD)),
+            List.of()
+        );
+        assertThat(
+            finalPhase,
+            equalTo(new Row(orig.source(), List.of(new Alias(orig.source(), "foo", new Literal(orig.source(), "foo", DataType.KEYWORD)))))
+        );
+
+        assertThat(Phased.extractFirstPhase(finalPhase), nullValue());
+    }
+
+    public class Dummy extends UnaryPlan implements Phased {
+        Dummy(Source source, LogicalPlan child) {
+            super(source, child);
+        }
+
+        @Override
+        public boolean expressionsResolved() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        protected NodeInfo<? extends LogicalPlan> info() {
+            return NodeInfo.create(this, Dummy::new, child());
+        }
+
+        @Override
+        public int hashCode() {
+            return child().hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof Dummy == false) {
+                return false;
+            }
+            Dummy other = (Dummy) obj;
+            return child().equals(other.child());
+        }
+
+        @Override
+        public UnaryPlan replaceChild(LogicalPlan newChild) {
+            return new Dummy(source(), newChild);
+        }
+
+        @Override
+        public List<Attribute> output() {
+            return child().output();
+        }
+
+        @Override
+        public LogicalPlan firstPhase() {
+            return child();
+        }
+
+        @Override
+        public LogicalPlan nextPhase(List<Attribute> schema, List<Page> firstPhaseResult) {
+            // Replace myself with a dummy "row" command
+            return new Row(
+                source(),
+                schema.stream().map(a -> new Alias(source(), a.name(), new Literal(source(), a.name(), DataType.KEYWORD))).toList()
+            );
+        }
+    }
+}

+ 3 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java

@@ -46,6 +46,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.string.Concat;
 import org.elasticsearch.xpack.esql.plan.logical.Dissect;
 import org.elasticsearch.xpack.esql.plan.logical.Grok;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.PhasedTests;
 import org.elasticsearch.xpack.esql.plan.logical.join.JoinType;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.Stat;
@@ -115,7 +116,7 @@ public class EsqlNodeSubclassTests<T extends B, B extends Node<B>> extends NodeS
     private static final Predicate<String> CLASSNAME_FILTER = className -> {
         boolean esqlCore = className.startsWith("org.elasticsearch.xpack.esql.core") != false;
         boolean esqlProper = className.startsWith("org.elasticsearch.xpack.esql") != false;
-        return esqlCore || esqlProper;
+        return (esqlCore || esqlProper) && className.equals(PhasedTests.Dummy.class.getName()) == false;
     };
 
     /**
@@ -126,7 +127,7 @@ public class EsqlNodeSubclassTests<T extends B, B extends Node<B>> extends NodeS
     @SuppressWarnings("rawtypes")
     public static List<Object[]> nodeSubclasses() throws IOException {
         return subclassesOf(Node.class, CLASSNAME_FILTER).stream()
-            .filter(c -> testClassFor(c) == null)
+            .filter(c -> testClassFor(c) == null || c != PhasedTests.Dummy.class)
             .map(c -> new Object[] { c })
             .toList();
     }