Browse Source

ESQL: Add timers to many status results (#105421)

This adds many "timers" to the profile and status results. These look
like:
```
  "profile" : {
    "drivers" : [
      {
        "took_nanos" : 57252128,
        "took_time" : "57.2ms",
        "cpu_nanos" : 56733751,
        "cpu_time" : "56.7ms",
        "operators" : [
...
            "operator" : "LuceneSourceOperator[maxPageSize=9362, remainingDocs=2147383647]",
...
              "processing_nanos" : 856914,
              "processing_time" : "856.9micros",
...
            "operator" : "ValuesSourceReaderOperator[fields = [a, b]]",
...
              "process_nanos" : 9671406,
              "process_time" : "9.6ms",
...
            "operator" : "EvalOperator[evaluator=AddLongsEvaluator[lhs=Attribute[channel=1], rhs=Attribute[channel=2]]]",
...
              "process_nanos" : 3344354,
              "process_time" : "3.3ms",
...
            "operator" : "HashAggregationOperator[blockHash=LongBlockHash{channel=1, entries=100, seenNull=false}, aggregators=[GroupingAggregator[aggregatorFunction=MaxLongGroupingAggregatorFunction[channels=[3]], mode=INITIAL], GroupingAggregator[aggregatorFunction=MinLongGroupingAggregatorFunction[channels=[3]], mode=INITIAL]]]",
...
              "hash_nanos" : 11845071,
              "hash_time" : "11.8ms",
              "aggregation_nanos" : 27910054,
              "aggregation_time" : "27.9ms",
...
```

This let's us compare the runtime of each operator in a query. You can
get the run times of a running ESQL task with the `_tasks` API or you
can use the `profile` API to run an ESQL query and get these times in
the result. The example above is from `profile`.

In the example above we can see we're spending 500 microseconds waiting
in the queue. Of the remaining 56ms a little under half of that is spent
running the aggregations - in this case a `MAX` and `MIN`. And 20% of
the time is hashing to build group keys. Similarly 20% is spent on field
loading.


Co-authored-by: Alexander Spies <alexander.spies@elastic.co>
Nik Everett 1 year ago
parent
commit
e390edbdf8
38 changed files with 1270 additions and 133 deletions
  1. 5 0
      docs/changelog/105421.yaml
  2. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  3. 5 0
      test/yaml-rest-runner/src/main/java/org/elasticsearch/test/rest/yaml/section/VersionRange.java
  4. 3 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java
  5. 25 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java
  6. 3 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java
  7. 9 4
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java
  8. 8 9
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java
  9. 45 8
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java
  10. 118 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java
  11. 92 32
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java
  12. 83 8
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java
  13. 71 2
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java
  14. 169 8
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java
  15. 7 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java
  16. 7 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java
  17. 2 2
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java
  18. 7 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java
  19. 7 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceOperator.java
  20. 7 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java
  21. 14 9
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java
  22. 14 15
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java
  23. 16 5
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingOperatorStatusTests.java
  24. 56 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AggregationOperatorStatusTests.java
  25. 27 3
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java
  26. 31 20
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java
  27. 167 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java
  28. 60 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorStatusTests.java
  29. 2 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java
  30. 4 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java
  31. 1 1
      x-pack/plugin/esql/qa/server/multi-node/src/yamlRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/EsqlClientYamlIT.java
  32. 2 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java
  33. 2 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
  34. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
  35. 4 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java
  36. 9 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java
  37. 43 4
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java
  38. 143 0
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml

+ 5 - 0
docs/changelog/105421.yaml

@@ -0,0 +1,5 @@
+pr: 105421
+summary: "ESQL: Add timers to many status results"
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -134,6 +134,7 @@ public class TransportVersions {
     public static final TransportVersion INGEST_GRAPH_STRUCTURE_EXCEPTION = def(8_594_00_0);
     public static final TransportVersion ML_MODEL_IN_SERVICE_SETTINGS = def(8_595_00_0);
     public static final TransportVersion RANDOM_AGG_SHARD_SEED = def(8_596_00_0);
+    public static final TransportVersion ESQL_TIMINGS = def(8_597_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 5 - 0
test/yaml-rest-runner/src/main/java/org/elasticsearch/test/rest/yaml/section/VersionRange.java

@@ -57,6 +57,11 @@ class VersionRange {
                 .orElseThrow(() -> new IllegalArgumentException("Checks against a version range require semantic version format (x.y.z)"));
             return minimumNodeVersion.onOrAfter(lower) && minimumNodeVersion.onOrBefore(upper);
         }
+
+        @Override
+        public String toString() {
+            return "MinimumContainedInVersionRange{lower=" + lower + ", upper=" + upper + '}';
+        }
     }
 
     static List<Predicate<Set<String>>> parseVersionRanges(String rawRanges) {

+ 3 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

@@ -123,6 +123,7 @@ public class LuceneCountOperator extends LuceneOperator {
             assert remainingDocs <= 0 : remainingDocs;
             return null;
         }
+        long start = System.nanoTime();
         try {
             final LuceneScorer scorer = getCurrentOrLoadNextScorer();
             // no scorer means no more docs
@@ -171,6 +172,8 @@ public class LuceneCountOperator extends LuceneOperator {
             return page;
         } catch (IOException e) {
             throw new UncheckedIOException(e);
+        } finally {
+            processingNanos += System.nanoTime() - start;
         }
     }
 

+ 25 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

@@ -16,6 +16,7 @@ import org.apache.lucene.search.Query;
 import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.util.Bits;
+import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -24,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.SourceOperator;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -60,6 +62,7 @@ public abstract class LuceneOperator extends SourceOperator {
 
     private LuceneScorer currentScorer;
 
+    long processingNanos;
     int pagesEmitted;
     boolean doneCollecting;
 
@@ -198,6 +201,7 @@ public abstract class LuceneOperator extends SourceOperator {
         private final int processedSlices;
         private final Set<String> processedQueries;
         private final Set<String> processedShards;
+        private final long processingNanos;
         private final int totalSlices;
         private final int pagesEmitted;
         private final int sliceIndex;
@@ -208,6 +212,7 @@ public abstract class LuceneOperator extends SourceOperator {
         private Status(LuceneOperator operator) {
             processedSlices = operator.processedSlices;
             processedQueries = operator.processedQueries.stream().map(Query::toString).collect(Collectors.toCollection(TreeSet::new));
+            processingNanos = operator.processingNanos;
             processedShards = new TreeSet<>(operator.processedShards);
             sliceIndex = operator.sliceIndex;
             totalSlices = operator.sliceQueue.totalSlices();
@@ -233,6 +238,7 @@ public abstract class LuceneOperator extends SourceOperator {
             int processedSlices,
             Set<String> processedQueries,
             Set<String> processedShards,
+            long processingNanos,
             int sliceIndex,
             int totalSlices,
             int pagesEmitted,
@@ -243,6 +249,7 @@ public abstract class LuceneOperator extends SourceOperator {
             this.processedSlices = processedSlices;
             this.processedQueries = processedQueries;
             this.processedShards = processedShards;
+            this.processingNanos = processingNanos;
             this.sliceIndex = sliceIndex;
             this.totalSlices = totalSlices;
             this.pagesEmitted = pagesEmitted;
@@ -260,6 +267,7 @@ public abstract class LuceneOperator extends SourceOperator {
                 processedQueries = Collections.emptySet();
                 processedShards = Collections.emptySet();
             }
+            processingNanos = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS) ? in.readVLong() : 0;
             sliceIndex = in.readVInt();
             totalSlices = in.readVInt();
             pagesEmitted = in.readVInt();
@@ -275,6 +283,9 @@ public abstract class LuceneOperator extends SourceOperator {
                 out.writeCollection(processedQueries, StreamOutput::writeString);
                 out.writeCollection(processedShards, StreamOutput::writeString);
             }
+            if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS)) {
+                out.writeVLong(processingNanos);
+            }
             out.writeVInt(sliceIndex);
             out.writeVInt(totalSlices);
             out.writeVInt(pagesEmitted);
@@ -300,6 +311,10 @@ public abstract class LuceneOperator extends SourceOperator {
             return processedShards;
         }
 
+        public long processNanos() {
+            return processingNanos;
+        }
+
         public int sliceIndex() {
             return sliceIndex;
         }
@@ -330,6 +345,10 @@ public abstract class LuceneOperator extends SourceOperator {
             builder.field("processed_slices", processedSlices);
             builder.field("processed_queries", processedQueries);
             builder.field("processed_shards", processedShards);
+            builder.field("processing_nanos", processingNanos);
+            if (builder.humanReadable()) {
+                builder.field("processing_time", TimeValue.timeValueNanos(processingNanos));
+            }
             builder.field("slice_index", sliceIndex);
             builder.field("total_slices", totalSlices);
             builder.field("pages_emitted", pagesEmitted);
@@ -347,6 +366,7 @@ public abstract class LuceneOperator extends SourceOperator {
             return processedSlices == status.processedSlices
                 && processedQueries.equals(status.processedQueries)
                 && processedShards.equals(status.processedShards)
+                && processingNanos == status.processingNanos
                 && sliceIndex == status.sliceIndex
                 && totalSlices == status.totalSlices
                 && pagesEmitted == status.pagesEmitted
@@ -364,6 +384,11 @@ public abstract class LuceneOperator extends SourceOperator {
         public String toString() {
             return Strings.toString(this);
         }
+
+        @Override
+        public TransportVersion getMinimalSupportedVersion() {
+            return TransportVersions.V_8_11_X;
+        }
     }
 
     static Function<ShardContext, Weight> weightFunction(Function<ShardContext, Query> queryFunction, ScoreMode scoreMode) {

+ 3 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

@@ -128,6 +128,7 @@ public class LuceneSourceOperator extends LuceneOperator {
             assert currentPagePos == 0 : currentPagePos;
             return null;
         }
+        long start = System.nanoTime();
         try {
             final LuceneScorer scorer = getCurrentOrLoadNextScorer();
             if (scorer == null) {
@@ -163,6 +164,8 @@ public class LuceneSourceOperator extends LuceneOperator {
             return page;
         } catch (IOException e) {
             throw new UncheckedIOException(e);
+        } finally {
+            processingNanos += System.nanoTime() - start;
         }
     }
 

+ 9 - 4
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java

@@ -140,10 +140,15 @@ public final class LuceneTopNSourceOperator extends LuceneOperator {
         if (isFinished()) {
             return null;
         }
-        if (isEmitting()) {
-            return emit(false);
-        } else {
-            return collect();
+        long start = System.nanoTime();
+        try {
+            if (isEmitting()) {
+                return emit(false);
+            } else {
+                return collect();
+            }
+        } finally {
+            processingNanos += System.nanoTime() - start;
         }
     }
 

+ 8 - 9
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

@@ -475,8 +475,8 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
     }
 
     @Override
-    protected Status status(int pagesProcessed) {
-        return new Status(new TreeMap<>(readersBuilt), pagesProcessed);
+    protected Status status(long processNanos, int pagesProcessed) {
+        return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed);
     }
 
     public static class Status extends AbstractPageMappingOperator.Status {
@@ -488,8 +488,8 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
 
         private final Map<String, Integer> readersBuilt;
 
-        Status(Map<String, Integer> readersBuilt, int pagesProcessed) {
-            super(pagesProcessed);
+        Status(Map<String, Integer> readersBuilt, long processNanos, int pagesProcessed) {
+            super(processNanos, pagesProcessed);
             this.readersBuilt = readersBuilt;
         }
 
@@ -521,21 +521,20 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
                 builder.field(e.getKey(), e.getValue());
             }
             builder.endObject();
-            builder.field("pages_processed", pagesProcessed());
+            innerToXContent(builder);
             return builder.endObject();
         }
 
         @Override
         public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
+            if (super.equals(o) == false) return false;
             Status status = (Status) o;
-            return pagesProcessed() == status.pagesProcessed() && readersBuilt.equals(status.readersBuilt);
+            return readersBuilt.equals(status.readersBuilt);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(readersBuilt, pagesProcessed());
+            return Objects.hash(super.hashCode(), readersBuilt);
         }
 
         @Override

+ 45 - 8
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java

@@ -7,12 +7,15 @@
 
 package org.elasticsearch.compute.operator;
 
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Releasables;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
@@ -25,6 +28,11 @@ public abstract class AbstractPageMappingOperator implements Operator {
     private Page prev;
     private boolean finished = false;
 
+    /**
+     * Number of milliseconds this operation has run.
+     */
+    private long processNanos;
+
     /**
      * Count of pages that have been processed by this operator.
      */
@@ -64,19 +72,21 @@ public abstract class AbstractPageMappingOperator implements Operator {
         if (prev.getPositionCount() == 0) {
             return prev;
         }
-        pagesProcessed++;
+        long start = System.nanoTime();
         Page p = process(prev);
+        pagesProcessed++;
+        processNanos += System.nanoTime() - start;
         prev = null;
         return p;
     }
 
     @Override
     public final Status status() {
-        return status(pagesProcessed);
+        return status(processNanos, pagesProcessed);
     }
 
-    protected Status status(int pagesProcessed) {
-        return new Status(pagesProcessed);
+    protected Status status(long processNanos, int pagesProcessed) {
+        return new Status(processNanos, pagesProcessed);
     }
 
     @Override
@@ -93,18 +103,24 @@ public abstract class AbstractPageMappingOperator implements Operator {
             Status::new
         );
 
+        private final long processNanos;
         private final int pagesProcessed;
 
-        public Status(int pagesProcessed) {
+        public Status(long processNanos, int pagesProcessed) {
+            this.processNanos = processNanos;
             this.pagesProcessed = pagesProcessed;
         }
 
         protected Status(StreamInput in) throws IOException {
+            processNanos = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS) ? in.readVLong() : 0;
             pagesProcessed = in.readVInt();
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
+            if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS)) {
+                out.writeVLong(processNanos);
+            }
             out.writeVInt(pagesProcessed);
         }
 
@@ -117,29 +133,50 @@ public abstract class AbstractPageMappingOperator implements Operator {
             return pagesProcessed;
         }
 
+        public long processNanos() {
+            return processNanos;
+        }
+
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();
-            builder.field("pages_processed", pagesProcessed);
+            innerToXContent(builder);
             return builder.endObject();
         }
 
+        /**
+         * Render the body of the object for this status. Protected so subclasses
+         * can call it to render the "default" body.
+         */
+        protected final XContentBuilder innerToXContent(XContentBuilder builder) throws IOException {
+            builder.field("process_nanos", processNanos);
+            if (builder.humanReadable()) {
+                builder.field("process_time", TimeValue.timeValueNanos(processNanos));
+            }
+            return builder.field("pages_processed", pagesProcessed);
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             Status status = (Status) o;
-            return pagesProcessed == status.pagesProcessed;
+            return processNanos == status.processNanos && pagesProcessed == status.pagesProcessed;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(pagesProcessed);
+            return Objects.hash(processNanos, pagesProcessed);
         }
 
         @Override
         public String toString() {
             return Strings.toString(this);
         }
+
+        @Override
+        public TransportVersion getMinimalSupportedVersion() {
+            return TransportVersions.V_8_11_X;
+        }
     }
 }

+ 118 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java

@@ -7,13 +7,22 @@
 
 package org.elasticsearch.compute.operator;
 
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.compute.aggregation.Aggregator;
 import org.elasticsearch.compute.aggregation.Aggregator.Factory;
 import org.elasticsearch.compute.aggregation.AggregatorMode;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Releasables;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xcontent.XContentBuilder;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
@@ -36,6 +45,15 @@ public class AggregationOperator implements Operator {
     private final List<Aggregator> aggregators;
     private final DriverContext driverContext;
 
+    /**
+     * Nanoseconds this operator has spent running the aggregations.
+     */
+    private long aggregationNanos;
+    /**
+     * Count of pages this operator has processed.
+     */
+    private int pagesProcessed;
+
     public record AggregationOperatorFactory(List<Factory> aggregators, AggregatorMode mode) implements OperatorFactory {
 
         @Override
@@ -72,6 +90,7 @@ public class AggregationOperator implements Operator {
 
     @Override
     public void addInput(Page page) {
+        long start = System.nanoTime();
         checkState(needsInput(), "Operator is already finishing");
         requireNonNull(page, "page is null");
         try {
@@ -80,6 +99,8 @@ public class AggregationOperator implements Operator {
             }
         } finally {
             page.releaseBlocks();
+            aggregationNanos += System.nanoTime() - start;
+            pagesProcessed++;
         }
     }
 
@@ -150,4 +171,101 @@ public class AggregationOperator implements Operator {
         sb.append("aggregators=").append(aggregators).append("]");
         return sb.toString();
     }
+
+    @Override
+    public Operator.Status status() {
+        return new Status(aggregationNanos, pagesProcessed);
+    }
+
+    public static class Status implements Operator.Status {
+        public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+            Operator.Status.class,
+            "agg",
+            Status::new
+        );
+
+        /**
+         * Nanoseconds this operator has spent running the aggregations.
+         */
+        private final long aggregationNanos;
+        /**
+         * Count of pages this operator has processed.
+         */
+        private final int pagesProcessed;
+
+        /**
+         * Build.
+         * @param aggregationNanos Nanoseconds this operator has spent running the aggregations.
+         * @param pagesProcessed Count of pages this operator has processed.
+         */
+        public Status(long aggregationNanos, int pagesProcessed) {
+            this.aggregationNanos = aggregationNanos;
+            this.pagesProcessed = pagesProcessed;
+        }
+
+        protected Status(StreamInput in) throws IOException {
+            aggregationNanos = in.readVLong();
+            pagesProcessed = in.readVInt();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeVLong(aggregationNanos);
+            out.writeVInt(pagesProcessed);
+        }
+
+        @Override
+        public String getWriteableName() {
+            return ENTRY.name;
+        }
+
+        /**
+         * Nanoseconds this operator has spent running the aggregations.
+         */
+        public long aggregationNanos() {
+            return aggregationNanos;
+        }
+
+        /**
+         * Count of pages this operator has processed.
+         */
+        public int pagesProcessed() {
+            return pagesProcessed;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.field("aggregation_nanos", aggregationNanos);
+            if (builder.humanReadable()) {
+                builder.field("aggregation_time", TimeValue.timeValueNanos(aggregationNanos));
+            }
+            builder.field("pages_processed", pagesProcessed);
+            return builder.endObject();
+
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Status status = (Status) o;
+            return aggregationNanos == status.aggregationNanos && pagesProcessed == status.pagesProcessed;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(aggregationNanos, pagesProcessed);
+        }
+
+        @Override
+        public String toString() {
+            return Strings.toString(this);
+        }
+
+        @Override
+        public TransportVersion getMinimalSupportedVersion() {
+            return TransportVersions.ESQL_TIMINGS;
+        }
+    }
 }

+ 92 - 32
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -49,6 +50,22 @@ public class Driver implements Releasable, Describable {
     public static final TimeValue DEFAULT_STATUS_INTERVAL = TimeValue.timeValueSeconds(1);
 
     private final String sessionId;
+
+    /**
+     * The wall clock time when this driver was created in milliseconds since epoch.
+     * Compared to {@link #startNanos} this is less accurate and is measured by a
+     * timer that can go backwards. This is only useful for presenting times to a
+     * user, like over the status API.
+     */
+    private final long startTime;
+
+    /**
+     * The time when this driver was created in nanos. This time is relative to
+     * some arbitrary point - imagine its program startup. The timer that generates
+     * this is monotonically increasing so even if NTP or something changes the
+     * clock it won't change. As such, this is only useful for measuring durations.
+     */
+    private final long startNanos;
     private final DriverContext driverContext;
     private final Supplier<String> description;
     private final List<Operator> activeOperators;
@@ -69,6 +86,13 @@ public class Driver implements Releasable, Describable {
      */
     private final AtomicReference<DriverStatus> status;
 
+    /**
+     * The time this driver finished. Only set once the driver is finished, defaults to 0
+     * which is *possibly* a valid value, so always use the driver status to check
+     * if the driver is actually finished.
+     */
+    private long finishNanos;
+
     /**
      * Creates a new driver with a chain of operators.
      * @param sessionId session Id
@@ -81,6 +105,8 @@ public class Driver implements Releasable, Describable {
      */
     public Driver(
         String sessionId,
+        long startTime,
+        long startNanos,
         DriverContext driverContext,
         Supplier<String> description,
         SourceOperator source,
@@ -90,6 +116,8 @@ public class Driver implements Releasable, Describable {
         Releasable releasable
     ) {
         this.sessionId = sessionId;
+        this.startTime = startTime;
+        this.startNanos = startNanos;
         this.driverContext = driverContext;
         this.description = description;
         this.activeOperators = new ArrayList<>();
@@ -99,7 +127,7 @@ public class Driver implements Releasable, Describable {
         this.statusNanos = statusInterval.nanos();
         this.releasable = releasable;
         this.status = new AtomicReference<>(
-            new DriverStatus(sessionId, System.currentTimeMillis(), DriverStatus.Status.QUEUED, List.of(), List.of())
+            new DriverStatus(sessionId, startTime, System.currentTimeMillis(), 0, 0, DriverStatus.Status.QUEUED, List.of(), List.of())
         );
     }
 
@@ -118,7 +146,18 @@ public class Driver implements Releasable, Describable {
         SinkOperator sink,
         Releasable releasable
     ) {
-        this("unset", driverContext, () -> null, source, intermediateOperators, sink, DEFAULT_STATUS_INTERVAL, releasable);
+        this(
+            "unset",
+            System.currentTimeMillis(),
+            System.nanoTime(),
+            driverContext,
+            () -> null,
+            source,
+            intermediateOperators,
+            sink,
+            DEFAULT_STATUS_INTERVAL,
+            releasable
+        );
     }
 
     public DriverContext driverContext() {
@@ -130,38 +169,39 @@ public class Driver implements Releasable, Describable {
      * Returns a blocked future when the chain of operators is blocked, allowing the caller
      * thread to do other work instead of blocking or busy-spinning on the blocked operator.
      */
-    private SubscribableListener<Void> run(TimeValue maxTime, int maxIterations) {
+    SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) {
         long maxTimeNanos = maxTime.nanos();
-        long startTime = System.nanoTime();
+        long startTime = nowSupplier.getAsLong();
         long nextStatus = startTime + statusNanos;
         int iter = 0;
-        while (isFinished() == false) {
+        while (true) {
             SubscribableListener<Void> fut = runSingleLoopIteration();
+            iter++;
             if (fut.isDone() == false) {
-                status.set(updateStatus(DriverStatus.Status.ASYNC));
+                updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC);
                 return fut;
             }
+            if (isFinished()) {
+                finishNanos = nowSupplier.getAsLong();
+                updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE);
+                driverContext.finish();
+                Releasables.close(releasable, driverContext.getSnapshot());
+                return Operator.NOT_BLOCKED;
+            }
+            long now = nowSupplier.getAsLong();
             if (iter >= maxIterations) {
-                break;
+                updateStatus(now - startTime, iter, DriverStatus.Status.WAITING);
+                return Operator.NOT_BLOCKED;
+            }
+            if (now - startTime >= maxTimeNanos) {
+                updateStatus(now - startTime, iter, DriverStatus.Status.WAITING);
+                return Operator.NOT_BLOCKED;
             }
-            long now = System.nanoTime();
             if (now > nextStatus) {
-                status.set(updateStatus(DriverStatus.Status.RUNNING));
+                updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING);
                 nextStatus = now + statusNanos;
             }
-            iter++;
-            if (now - startTime > maxTimeNanos) {
-                break;
-            }
-        }
-        if (isFinished()) {
-            status.set(updateStatus(DriverStatus.Status.DONE));
-            driverContext.finish();
-            Releasables.close(releasable, driverContext.getSnapshot());
-        } else {
-            status.set(updateStatus(DriverStatus.Status.WAITING));
         }
-        return Operator.NOT_BLOCKED;
     }
 
     /**
@@ -180,6 +220,7 @@ public class Driver implements Releasable, Describable {
      * Abort the driver and wait for it to finish
      */
     public void abort(Exception reason, ActionListener<Void> listener) {
+        finishNanos = System.nanoTime();
         completionListener.addListener(listener);
         if (started.compareAndSet(false, true)) {
             drainAndCloseOperators(reason);
@@ -286,7 +327,7 @@ public class Driver implements Releasable, Describable {
     ) {
         driver.completionListener.addListener(listener);
         if (driver.started.compareAndSet(false, true)) {
-            driver.status.set(driver.updateStatus(DriverStatus.Status.STARTING));
+            driver.updateStatus(0, 0, DriverStatus.Status.STARTING);
             schedule(DEFAULT_TIME_BEFORE_YIELDING, maxIterations, threadContext, executor, driver, driver.completionListener);
         }
     }
@@ -324,7 +365,7 @@ public class Driver implements Releasable, Describable {
                     onComplete(listener);
                     return;
                 }
-                SubscribableListener<Void> fut = driver.run(maxTime, maxIterations);
+                SubscribableListener<Void> fut = driver.run(maxTime, maxIterations, System::nanoTime);
                 if (fut.isDone()) {
                     schedule(maxTime, maxIterations, threadContext, executor, driver, listener);
                 } else {
@@ -384,23 +425,42 @@ public class Driver implements Releasable, Describable {
     /**
      * Get the last status update from the driver. These updates are made
      * when the driver is queued and after every
-     * processing {@link #run(TimeValue, int) batch}.
+     * processing {@link #run batch}.
      */
     public DriverStatus status() {
         return status.get();
     }
 
+    /**
+     * Build a "profile" of this driver's operations after it's been completed.
+     * This doesn't make sense to call before the driver is done.
+     */
+    public DriverProfile profile() {
+        DriverStatus status = status();
+        if (status.status() != DriverStatus.Status.DONE) {
+            throw new IllegalStateException("can only get profile from finished driver");
+        }
+        return new DriverProfile(finishNanos - startNanos, status.cpuNanos(), status.iterations(), status.completedOperators());
+    }
+
     /**
      * Update the status.
+     * @param extraCpuNanos how many cpu nanoseconds to add to the previous status
+     * @param extraIterations how many iterations to add to the previous status
      * @param status the status of the overall driver request
      */
-    private DriverStatus updateStatus(DriverStatus.Status status) {
-        return new DriverStatus(
-            sessionId,
-            System.currentTimeMillis(),
-            status,
-            statusOfCompletedOperators,
-            activeOperators.stream().map(op -> new DriverStatus.OperatorStatus(op.toString(), op.status())).toList()
-        );
+    private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.Status status) {
+        this.status.getAndUpdate(prev -> {
+            return new DriverStatus(
+                sessionId,
+                startTime,
+                System.currentTimeMillis(),
+                prev.cpuNanos() + extraCpuNanos,
+                prev.iterations() + extraIterations,
+                status,
+                statusOfCompletedOperators,
+                activeOperators.stream().map(op -> new DriverStatus.OperatorStatus(op.toString(), op.status())).toList()
+            );
+        });
     }
 }

+ 83 - 8
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java

@@ -7,12 +7,15 @@
 
 package org.elasticsearch.compute.operator;
 
+import org.elasticsearch.TransportVersions;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
 import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.xcontent.ToXContent;
 
 import java.io.IOException;
@@ -24,35 +27,99 @@ import java.util.Objects;
  * Profile results from a single {@link Driver}.
  */
 public class DriverProfile implements Writeable, ChunkedToXContentObject {
+    /**
+     * Nanos between creation and completion of the {@link Driver}.
+     */
+    private final long tookNanos;
+
+    /**
+     * Nanos this {@link Driver} has been running on the cpu. Does not
+     * include async or waiting time.
+     */
+    private final long cpuNanos;
+
+    /**
+     * The number of times the driver has moved a single page up the
+     * chain of operators as far as it'll go.
+     */
+    private final long iterations;
+
     /**
      * Status of each {@link Operator} in the driver when it finishes.
      */
     private final List<DriverStatus.OperatorStatus> operators;
 
-    public DriverProfile(List<DriverStatus.OperatorStatus> operators) {
+    public DriverProfile(long tookNanos, long cpuNanos, long iterations, List<DriverStatus.OperatorStatus> operators) {
+        this.tookNanos = tookNanos;
+        this.cpuNanos = cpuNanos;
+        this.iterations = iterations;
         this.operators = operators;
     }
 
     public DriverProfile(StreamInput in) throws IOException {
+        if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS)) {
+            this.tookNanos = in.readVLong();
+            this.cpuNanos = in.readVLong();
+            this.iterations = in.readVLong();
+        } else {
+            this.tookNanos = 0;
+            this.cpuNanos = 0;
+            this.iterations = 0;
+        }
         this.operators = in.readCollectionAsImmutableList(DriverStatus.OperatorStatus::new);
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
+        if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS)) {
+            out.writeVLong(tookNanos);
+            out.writeVLong(cpuNanos);
+            out.writeVLong(iterations);
+        }
         out.writeCollection(operators);
     }
 
+    /**
+     * Nanos between creation and completion of the {@link Driver}.
+     */
+    public long tookNanos() {
+        return tookNanos;
+    }
+
+    /**
+     * Nanos this {@link Driver} has been running on the cpu. Does not
+     * include async or waiting time.
+     */
+    public long cpuNanos() {
+        return cpuNanos;
+    }
+
+    /**
+     * The number of times the driver has moved a single page up the
+     * chain of operators as far as it'll go.
+     */
+    public long iterations() {
+        return iterations;
+    }
+
     List<DriverStatus.OperatorStatus> operators() {
         return operators;
     }
 
     @Override
     public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
-        return Iterators.concat(
-            ChunkedToXContentHelper.startObject(),
-            ChunkedToXContentHelper.array("operators", operators.iterator()),
-            ChunkedToXContentHelper.endObject()
-        );
+        return Iterators.concat(ChunkedToXContentHelper.startObject(), Iterators.single((b, p) -> {
+            b.field("took_nanos", tookNanos);
+            if (b.humanReadable()) {
+                b.field("took_time", TimeValue.timeValueNanos(tookNanos));
+            }
+            b.field("cpu_nanos", cpuNanos);
+            if (b.humanReadable()) {
+                b.field("cpu_time", TimeValue.timeValueNanos(cpuNanos));
+            }
+            b.field("iterations", iterations);
+            return b;
+        }), ChunkedToXContentHelper.array("operators", operators.iterator()), ChunkedToXContentHelper.endObject());
     }
 
     @Override
@@ -64,11 +131,19 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
             return false;
         }
         DriverProfile that = (DriverProfile) o;
-        return Objects.equals(operators, that.operators);
+        return tookNanos == that.tookNanos
+            && cpuNanos == that.cpuNanos
+            && iterations == that.iterations
+            && Objects.equals(operators, that.operators);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(operators);
+        return Objects.hash(tookNanos, cpuNanos, iterations, operators);
+    }
+
+    @Override
+    public String toString() {
+        return Strings.toString(this);
     }
 }

+ 71 - 2
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java

@@ -12,8 +12,10 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.xcontent.ToXContentFragment;
@@ -39,10 +41,29 @@ public class DriverStatus implements Task.Status {
      * The session for this driver.
      */
     private final String sessionId;
+
+    /**
+     * Milliseconds since epoch when this driver started.
+     */
+    private final long started;
+
     /**
      * When this status was generated.
      */
     private final long lastUpdated;
+
+    /**
+     * Nanos this {@link Driver} has been running on the cpu. Does not
+     * include async or waiting time.
+     */
+    private final long cpuNanos;
+
+    /**
+     * The number of times the driver has moved a single page up the
+     * chain of operators as far as it'll go.
+     */
+    private final long iterations;
+
     /**
      * The state of the overall driver - queue, starting, running, finished.
      */
@@ -60,13 +81,19 @@ public class DriverStatus implements Task.Status {
 
     DriverStatus(
         String sessionId,
+        long started,
         long lastUpdated,
+        long cpuTime,
+        long iterations,
         Status status,
         List<OperatorStatus> completedOperators,
         List<OperatorStatus> activeOperators
     ) {
         this.sessionId = sessionId;
+        this.started = started;
         this.lastUpdated = lastUpdated;
+        this.cpuNanos = cpuTime;
+        this.iterations = iterations;
         this.status = status;
         this.completedOperators = completedOperators;
         this.activeOperators = activeOperators;
@@ -74,7 +101,10 @@ public class DriverStatus implements Task.Status {
 
     public DriverStatus(StreamInput in) throws IOException {
         this.sessionId = in.readString();
+        this.started = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS) ? in.readLong() : 0;
         this.lastUpdated = in.readLong();
+        this.cpuNanos = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS) ? in.readVLong() : 0;
+        this.iterations = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS) ? in.readVLong() : 0;
         this.status = Status.valueOf(in.readString());
         if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
             this.completedOperators = in.readCollectionAsImmutableList(OperatorStatus::new);
@@ -87,7 +117,14 @@ public class DriverStatus implements Task.Status {
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(sessionId);
+        if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS)) {
+            out.writeLong(started);
+        }
         out.writeLong(lastUpdated);
+        if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS)) {
+            out.writeVLong(cpuNanos);
+            out.writeVLong(iterations);
+        }
         out.writeString(status.toString());
         if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
             out.writeCollection(completedOperators);
@@ -107,6 +144,13 @@ public class DriverStatus implements Task.Status {
         return sessionId;
     }
 
+    /**
+     * When this {@link Driver} was started.
+     */
+    public long started() {
+        return started;
+    }
+
     /**
      * When this status was generated.
      */
@@ -114,6 +158,22 @@ public class DriverStatus implements Task.Status {
         return lastUpdated;
     }
 
+    /**
+     * Nanos this {@link Driver} has been running on the cpu. Does not
+     * include async or waiting time.
+     */
+    public long cpuNanos() {
+        return cpuNanos;
+    }
+
+    /**
+     * The number of times the driver has moved a single page up the
+     * chain of operators as far as it'll go.
+     */
+    public long iterations() {
+        return iterations;
+    }
+
     /**
      * The state of the overall driver - queue, starting, running, finished.
      */
@@ -139,7 +199,13 @@ public class DriverStatus implements Task.Status {
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
         builder.field("sessionId", sessionId);
+        builder.field("started", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(started));
         builder.field("last_updated", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(lastUpdated));
+        builder.field("cpu_nanos", cpuNanos);
+        if (builder.humanReadable()) {
+            builder.field("cpu_time", TimeValue.timeValueNanos(cpuNanos));
+        }
+        builder.field("iterations", iterations);
         builder.field("status", status.toString().toLowerCase(Locale.ROOT));
         builder.startArray("completed_operators");
         for (OperatorStatus completed : completedOperators) {
@@ -160,7 +226,10 @@ public class DriverStatus implements Task.Status {
         if (o == null || getClass() != o.getClass()) return false;
         DriverStatus that = (DriverStatus) o;
         return sessionId.equals(that.sessionId)
+            && started == that.started
             && lastUpdated == that.lastUpdated
+            && cpuNanos == that.cpuNanos
+            && iterations == that.iterations
             && status == that.status
             && completedOperators.equals(that.completedOperators)
             && activeOperators.equals(that.activeOperators);
@@ -168,7 +237,7 @@ public class DriverStatus implements Task.Status {
 
     @Override
     public int hashCode() {
-        return Objects.hash(sessionId, lastUpdated, status, completedOperators, activeOperators);
+        return Objects.hash(sessionId, started, lastUpdated, cpuNanos, iterations, status, completedOperators, activeOperators);
     }
 
     @Override
@@ -204,7 +273,7 @@ public class DriverStatus implements Task.Status {
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeString(operator);
-            out.writeOptionalNamedWriteable(status);
+            out.writeOptionalNamedWriteable(status != null && VersionedNamedWriteable.shouldSerialize(out, status) ? status : null);
         }
 
         public String operator() {

+ 169 - 8
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java

@@ -7,6 +7,12 @@
 
 package org.elasticsearch.compute.operator;
 
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.compute.Describable;
 import org.elasticsearch.compute.aggregation.GroupingAggregator;
 import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
@@ -17,10 +23,14 @@ import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Releasables;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xcontent.XContentBuilder;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
@@ -61,6 +71,19 @@ public class HashAggregationOperator implements Operator {
 
     private final DriverContext driverContext;
 
+    /**
+     * Nanoseconds this operator has spent hashing grouping keys.
+     */
+    private long hashNanos;
+    /**
+     * Nanoseconds this operator has spent running the aggregations.
+     */
+    private long aggregationNanos;
+    /**
+     * Count of pages this operator has processed.
+     */
+    private int pagesProcessed;
+
     @SuppressWarnings("this-escape")
     public HashAggregationOperator(
         List<GroupingAggregator.Factory> aggregators,
@@ -91,36 +114,58 @@ public class HashAggregationOperator implements Operator {
     @Override
     public void addInput(Page page) {
         try {
-            checkState(needsInput(), "Operator is already finishing");
-            requireNonNull(page, "page is null");
-
             GroupingAggregatorFunction.AddInput[] prepared = new GroupingAggregatorFunction.AddInput[aggregators.size()];
-            for (int i = 0; i < prepared.length; i++) {
-                prepared[i] = aggregators.get(i).prepareProcessPage(blockHash, page);
-            }
+            class AddInput implements GroupingAggregatorFunction.AddInput {
+                long hashStart = System.nanoTime();
+                long aggStart;
 
-            blockHash.add(wrapPage(page), new GroupingAggregatorFunction.AddInput() {
                 @Override
                 public void add(int positionOffset, IntBlock groupIds) {
                     IntVector groupIdsVector = groupIds.asVector();
                     if (groupIdsVector != null) {
                         add(positionOffset, groupIdsVector);
                     } else {
+                        startAggEndHash();
                         for (GroupingAggregatorFunction.AddInput p : prepared) {
                             p.add(positionOffset, groupIds);
                         }
+                        end();
                     }
                 }
 
                 @Override
                 public void add(int positionOffset, IntVector groupIds) {
+                    startAggEndHash();
                     for (GroupingAggregatorFunction.AddInput p : prepared) {
                         p.add(positionOffset, groupIds);
                     }
+                    end();
                 }
-            });
+
+                private void startAggEndHash() {
+                    aggStart = System.nanoTime();
+                    hashNanos += aggStart - hashStart;
+                }
+
+                private void end() {
+                    hashStart = System.nanoTime();
+                    aggregationNanos += hashStart - aggStart;
+                }
+            }
+            AddInput add = new AddInput();
+
+            checkState(needsInput(), "Operator is already finishing");
+            requireNonNull(page, "page is null");
+
+            for (int i = 0; i < prepared.length; i++) {
+                prepared[i] = aggregators.get(i).prepareProcessPage(blockHash, page);
+            }
+
+            blockHash.add(wrapPage(page), add);
+            hashNanos += System.nanoTime() - add.hashStart;
         } finally {
             page.releaseBlocks();
+            pagesProcessed++;
         }
     }
 
@@ -178,6 +223,11 @@ public class HashAggregationOperator implements Operator {
         Releasables.close(blockHash, () -> Releasables.close(aggregators));
     }
 
+    @Override
+    public Operator.Status status() {
+        return new Status(hashNanos, aggregationNanos, pagesProcessed);
+    }
+
     protected static void checkState(boolean condition, String msg) {
         if (condition == false) {
             throw new IllegalArgumentException(msg);
@@ -197,4 +247,115 @@ public class HashAggregationOperator implements Operator {
         sb.append("]");
         return sb.toString();
     }
+
+    public static class Status implements Operator.Status {
+        public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+            Operator.Status.class,
+            "hashagg",
+            Status::new
+        );
+
+        /**
+         * Nanoseconds this operator has spent hashing grouping keys.
+         */
+        private final long hashNanos;
+        /**
+         * Nanoseconds this operator has spent running the aggregations.
+         */
+        private final long aggregationNanos;
+        /**
+         * Count of pages this operator has processed.
+         */
+        private final int pagesProcessed;
+
+        /**
+         * Build.
+         * @param hashNanos Nanoseconds this operator has spent hashing grouping keys.
+         * @param aggregationNanos Nanoseconds this operator has spent running the aggregations.
+         * @param pagesProcessed Count of pages this operator has processed.
+         */
+        public Status(long hashNanos, long aggregationNanos, int pagesProcessed) {
+            this.hashNanos = hashNanos;
+            this.aggregationNanos = aggregationNanos;
+            this.pagesProcessed = pagesProcessed;
+        }
+
+        protected Status(StreamInput in) throws IOException {
+            hashNanos = in.readVLong();
+            aggregationNanos = in.readVLong();
+            pagesProcessed = in.readVInt();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeVLong(hashNanos);
+            out.writeVLong(aggregationNanos);
+            out.writeVInt(pagesProcessed);
+        }
+
+        @Override
+        public String getWriteableName() {
+            return ENTRY.name;
+        }
+
+        /**
+         * Nanoseconds this operator has spent hashing grouping keys.
+         */
+        public long hashNanos() {
+            return hashNanos;
+        }
+
+        /**
+         * Nanoseconds this operator has spent running the aggregations.
+         */
+        public long aggregationNanos() {
+            return aggregationNanos;
+        }
+
+        /**
+         * Count of pages this operator has processed.
+         */
+        public int pagesProcessed() {
+            return pagesProcessed;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.field("hash_nanos", hashNanos);
+            if (builder.humanReadable()) {
+                builder.field("hash_time", TimeValue.timeValueNanos(hashNanos));
+            }
+            builder.field("aggregation_nanos", aggregationNanos);
+            if (builder.humanReadable()) {
+                builder.field("aggregation_time", TimeValue.timeValueNanos(aggregationNanos));
+            }
+            builder.field("pages_processed", pagesProcessed);
+            return builder.endObject();
+
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Status status = (Status) o;
+            return hashNanos == status.hashNanos && aggregationNanos == status.aggregationNanos && pagesProcessed == status.pagesProcessed;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(hashNanos, aggregationNanos, pagesProcessed);
+        }
+
+        @Override
+        public String toString() {
+            return Strings.toString(this);
+        }
+
+        @Override
+        public TransportVersion getMinimalSupportedVersion() {
+            return TransportVersions.ESQL_TIMINGS;
+        }
+    }
 }

+ 7 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.compute.operator;
 
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -229,5 +231,10 @@ public class LimitOperator implements Operator {
         public String toString() {
             return Strings.toString(this);
         }
+
+        @Override
+        public TransportVersion getMinimalSupportedVersion() {
+            return TransportVersions.V_8_11_X;
+        }
     }
 }

+ 7 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.compute.operator;
 
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -322,5 +324,10 @@ public class MvExpandOperator implements Operator {
         public String toString() {
             return Strings.toString(this);
         }
+
+        @Override
+        public TransportVersion getMinimalSupportedVersion() {
+            return TransportVersions.V_8_11_X;
+        }
     }
 }

+ 2 - 2
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java

@@ -8,7 +8,7 @@
 package org.elasticsearch.compute.operator;
 
 import org.elasticsearch.action.support.SubscribableListener;
-import org.elasticsearch.common.io.stream.NamedWriteable;
+import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.compute.Describable;
 import org.elasticsearch.compute.data.Block;
@@ -105,5 +105,5 @@ public interface Operator extends Releasable {
     /**
      * Status of an {@link Operator} to be returned by the tasks API.
      */
-    interface Status extends ToXContentObject, NamedWriteable {}
+    interface Status extends ToXContentObject, VersionedNamedWriteable {}
 }

+ 7 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.compute.operator.exchange;
 
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -148,5 +150,10 @@ public class ExchangeSinkOperator extends SinkOperator {
         public String toString() {
             return Strings.toString(this);
         }
+
+        @Override
+        public TransportVersion getMinimalSupportedVersion() {
+            return TransportVersions.V_8_11_X;
+        }
     }
 }

+ 7 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceOperator.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.compute.operator.exchange;
 
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -157,5 +159,10 @@ public class ExchangeSourceOperator extends SourceOperator {
         public String toString() {
             return Strings.toString(this);
         }
+
+        @Override
+        public TransportVersion getMinimalSupportedVersion() {
+            return TransportVersions.V_8_11_X;
+        }
     }
 }

+ 7 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.compute.operator.topn;
 
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -77,4 +79,9 @@ public class TopNOperatorStatus implements Operator.Status {
     public int hashCode() {
         return Objects.hash(occupiedRows, ramBytesUsed);
     }
+
+    @Override
+    public TransportVersion getMinimalSupportedVersion() {
+        return TransportVersions.V_8_11_X;
+    }
 }

+ 14 - 9
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java

@@ -20,7 +20,7 @@ import static org.hamcrest.Matchers.equalTo;
 
 public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTestCase<LuceneSourceOperator.Status> {
     public static LuceneSourceOperator.Status simple() {
-        return new LuceneSourceOperator.Status(2, Set.of("*:*"), new TreeSet<>(List.of("a:0", "a:1")), 0, 1, 5, 123, 99990, 8000);
+        return new LuceneSourceOperator.Status(2, Set.of("*:*"), new TreeSet<>(List.of("a:0", "a:1")), 1002, 0, 1, 5, 123, 99990, 8000);
     }
 
     public static String simpleToJson() {
@@ -34,6 +34,8 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest
                 "a:0",
                 "a:1"
               ],
+              "processing_nanos" : 1002,
+              "processing_time" : "1micros",
               "slice_index" : 0,
               "total_slices" : 1,
               "pages_emitted" : 5,
@@ -58,6 +60,7 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest
             randomNonNegativeInt(),
             randomProcessedQueries(),
             randomProcessedShards(),
+            randomNonNegativeLong(),
             randomNonNegativeInt(),
             randomNonNegativeInt(),
             randomNonNegativeInt(),
@@ -90,29 +93,31 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest
         int processedSlices = instance.processedSlices();
         Set<String> processedQueries = instance.processedQueries();
         Set<String> processedShards = instance.processedShards();
+        long processNanos = instance.processNanos();
         int sliceIndex = instance.sliceIndex();
         int totalSlices = instance.totalSlices();
         int pagesEmitted = instance.pagesEmitted();
         int sliceMin = instance.sliceMin();
         int sliceMax = instance.sliceMax();
         int current = instance.current();
-        switch (between(0, 8)) {
+        switch (between(0, 9)) {
             case 0 -> processedSlices = randomValueOtherThan(processedSlices, ESTestCase::randomNonNegativeInt);
             case 1 -> processedQueries = randomValueOtherThan(processedQueries, LuceneSourceOperatorStatusTests::randomProcessedQueries);
             case 2 -> processedShards = randomValueOtherThan(processedShards, LuceneSourceOperatorStatusTests::randomProcessedShards);
-            case 3 -> sliceIndex = randomValueOtherThan(sliceIndex, ESTestCase::randomNonNegativeInt);
-            case 4 -> totalSlices = randomValueOtherThan(totalSlices, ESTestCase::randomNonNegativeInt);
-            case 5 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
-            case 6 -> sliceMin = randomValueOtherThan(sliceMin, ESTestCase::randomNonNegativeInt);
-            case 7 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt);
-            case 8 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt);
+            case 3 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong);
+            case 4 -> sliceIndex = randomValueOtherThan(sliceIndex, ESTestCase::randomNonNegativeInt);
+            case 5 -> totalSlices = randomValueOtherThan(totalSlices, ESTestCase::randomNonNegativeInt);
+            case 6 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
+            case 7 -> sliceMin = randomValueOtherThan(sliceMin, ESTestCase::randomNonNegativeInt);
+            case 8 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt);
+            case 9 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt);
             default -> throw new UnsupportedOperationException();
         }
-        ;
         return new LuceneSourceOperator.Status(
             processedSlices,
             processedQueries,
             processedShards,
+            processNanos,
             sliceIndex,
             totalSlices,
             pagesEmitted,

+ 14 - 15
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.compute.lucene;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
 import java.util.Map;
@@ -19,7 +20,7 @@ import static org.hamcrest.Matchers.equalTo;
 
 public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializingTestCase<ValuesSourceReaderOperator.Status> {
     public static ValuesSourceReaderOperator.Status simple() {
-        return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 123);
+        return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123);
     }
 
     public static String simpleToJson() {
@@ -28,6 +29,8 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi
               "readers_built" : {
                 "ReaderType" : 3
               },
+              "process_nanos" : 1022323,
+              "process_time" : "1ms",
               "pages_processed" : 123
             }""";
     }
@@ -43,7 +46,7 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi
 
     @Override
     public ValuesSourceReaderOperator.Status createTestInstance() {
-        return new ValuesSourceReaderOperator.Status(randomReadersBuilt(), between(0, Integer.MAX_VALUE));
+        return new ValuesSourceReaderOperator.Status(randomReadersBuilt(), randomNonNegativeLong(), randomNonNegativeInt());
     }
 
     private Map<String, Integer> randomReadersBuilt() {
@@ -57,19 +60,15 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi
 
     @Override
     protected ValuesSourceReaderOperator.Status mutateInstance(ValuesSourceReaderOperator.Status instance) throws IOException {
-        switch (between(0, 1)) {
-            case 0:
-                return new ValuesSourceReaderOperator.Status(
-                    randomValueOtherThan(instance.readersBuilt(), this::randomReadersBuilt),
-                    instance.pagesProcessed()
-                );
-            case 1:
-                return new ValuesSourceReaderOperator.Status(
-                    instance.readersBuilt(),
-                    randomValueOtherThan(instance.pagesProcessed(), () -> between(0, Integer.MAX_VALUE))
-                );
-            default:
-                throw new UnsupportedOperationException();
+        Map<String, Integer> readersBuilt = instance.readersBuilt();
+        long processNanos = instance.processNanos();
+        int pagesProcessed = instance.pagesProcessed();
+        switch (between(0, 2)) {
+            case 0 -> readersBuilt = randomValueOtherThan(readersBuilt, this::randomReadersBuilt);
+            case 1 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong);
+            case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
+            default -> throw new UnsupportedOperationException();
         }
+        return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed);
     }
 }

+ 16 - 5
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingOperatorStatusTests.java

@@ -16,16 +16,20 @@ import static org.hamcrest.Matchers.equalTo;
 
 public class AbstractPageMappingOperatorStatusTests extends AbstractWireSerializingTestCase<AbstractPageMappingOperator.Status> {
     public static AbstractPageMappingOperator.Status simple() {
-        return new AbstractPageMappingOperator.Status(123);
+        return new AbstractPageMappingOperator.Status(200012, 123);
     }
 
     public static String simpleToJson() {
         return """
-            {"pages_processed":123}""";
+            {
+              "process_nanos" : 200012,
+              "process_time" : "200micros",
+              "pages_processed" : 123
+            }""";
     }
 
     public void testToXContent() {
-        assertThat(Strings.toString(simple()), equalTo(simpleToJson()));
+        assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
     }
 
     @Override
@@ -35,11 +39,18 @@ public class AbstractPageMappingOperatorStatusTests extends AbstractWireSerializ
 
     @Override
     public AbstractPageMappingOperator.Status createTestInstance() {
-        return new AbstractPageMappingOperator.Status(randomNonNegativeInt());
+        return new AbstractPageMappingOperator.Status(randomNonNegativeLong(), randomNonNegativeInt());
     }
 
     @Override
     protected AbstractPageMappingOperator.Status mutateInstance(AbstractPageMappingOperator.Status instance) {
-        return new AbstractPageMappingOperator.Status(randomValueOtherThan(instance.pagesProcessed(), ESTestCase::randomNonNegativeInt));
+        long processNanos = instance.processNanos();
+        int pagesProcessed = instance.pagesProcessed();
+        switch (between(0, 1)) {
+            case 0 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong);
+            case 1 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
+            default -> throw new UnsupportedOperationException();
+        }
+        return new AbstractPageMappingOperator.Status(processNanos, pagesProcessed);
     }
 }

+ 56 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AggregationOperatorStatusTests.java

@@ -0,0 +1,56 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class AggregationOperatorStatusTests extends AbstractWireSerializingTestCase<AggregationOperator.Status> {
+    public static AggregationOperator.Status simple() {
+        return new AggregationOperator.Status(200012, 123);
+    }
+
+    public static String simpleToJson() {
+        return """
+            {
+              "aggregation_nanos" : 200012,
+              "aggregation_time" : "200micros",
+              "pages_processed" : 123
+            }""";
+    }
+
+    public void testToXContent() {
+        assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
+    }
+
+    @Override
+    protected Writeable.Reader<AggregationOperator.Status> instanceReader() {
+        return AggregationOperator.Status::new;
+    }
+
+    @Override
+    public AggregationOperator.Status createTestInstance() {
+        return new AggregationOperator.Status(randomNonNegativeLong(), randomNonNegativeInt());
+    }
+
+    @Override
+    protected AggregationOperator.Status mutateInstance(AggregationOperator.Status instance) {
+        long aggregationNanos = instance.aggregationNanos();
+        int pagesProcessed = instance.pagesProcessed();
+        switch (between(0, 1)) {
+            case 0 -> aggregationNanos = randomValueOtherThan(aggregationNanos, ESTestCase::randomNonNegativeLong);
+            case 1 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
+            default -> throw new UnsupportedOperationException();
+        }
+        return new AggregationOperator.Status(aggregationNanos, pagesProcessed);
+    }
+}

+ 27 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java

@@ -16,6 +16,7 @@ import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorStatusTests;
 import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
 import java.util.List;
@@ -25,6 +26,9 @@ import static org.hamcrest.Matchers.equalTo;
 public class DriverProfileTests extends AbstractWireSerializingTestCase<DriverProfile> {
     public void testToXContent() {
         DriverProfile status = new DriverProfile(
+            10012,
+            10000,
+            12,
             List.of(
                 new DriverStatus.OperatorStatus("LuceneSource", LuceneSourceOperatorStatusTests.simple()),
                 new DriverStatus.OperatorStatus("ValuesSourceReader", ValuesSourceReaderOperatorStatusTests.simple())
@@ -32,6 +36,11 @@ public class DriverProfileTests extends AbstractWireSerializingTestCase<DriverPr
         );
         assertThat(Strings.toString(status, true, true), equalTo("""
             {
+              "took_nanos" : 10012,
+              "took_time" : "10micros",
+              "cpu_nanos" : 10000,
+              "cpu_time" : "10micros",
+              "iterations" : 12,
               "operators" : [
                 {
                   "operator" : "LuceneSource",
@@ -56,13 +65,28 @@ public class DriverProfileTests extends AbstractWireSerializingTestCase<DriverPr
 
     @Override
     protected DriverProfile createTestInstance() {
-        return new DriverProfile(DriverStatusTests.randomOperatorStatuses());
+        return new DriverProfile(
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            DriverStatusTests.randomOperatorStatuses()
+        );
     }
 
     @Override
     protected DriverProfile mutateInstance(DriverProfile instance) throws IOException {
-        var operators = randomValueOtherThan(instance.operators(), DriverStatusTests::randomOperatorStatuses);
-        return new DriverProfile(operators);
+        long tookNanos = instance.tookNanos();
+        long cpuNanos = instance.cpuNanos();
+        long iterations = instance.iterations();
+        var operators = instance.operators();
+        switch (between(0, 3)) {
+            case 0 -> tookNanos = randomValueOtherThan(tookNanos, ESTestCase::randomNonNegativeLong);
+            case 1 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong);
+            case 2 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong);
+            case 3 -> operators = randomValueOtherThan(operators, DriverStatusTests::randomOperatorStatuses);
+            default -> throw new UnsupportedOperationException();
+        }
+        return new DriverProfile(tookNanos, cpuNanos, iterations, operators);
     }
 
     @Override

+ 31 - 20
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java

@@ -31,7 +31,10 @@ public class DriverStatusTests extends AbstractWireSerializingTestCase<DriverSta
     public void testToXContent() {
         DriverStatus status = new DriverStatus(
             "ABC:123",
+            123413220000L,
             123413243214L,
+            123213L,
+            55L,
             DriverStatus.Status.RUNNING,
             List.of(
                 new DriverStatus.OperatorStatus("LuceneSource", LuceneSourceOperatorStatusTests.simple()),
@@ -42,7 +45,11 @@ public class DriverStatusTests extends AbstractWireSerializingTestCase<DriverSta
         assertThat(Strings.toString(status, true, true), equalTo("""
             {
               "sessionId" : "ABC:123",
+              "started" : "1973-11-29T09:27:00.000Z",
               "last_updated" : "1973-11-29T09:27:23.214Z",
+              "cpu_nanos" : 123213,
+              "cpu_time" : "123.2micros",
+              "iterations" : 55,
               "status" : "running",
               "completed_operators" : [
                 {
@@ -76,7 +83,16 @@ public class DriverStatusTests extends AbstractWireSerializingTestCase<DriverSta
 
     @Override
     protected DriverStatus createTestInstance() {
-        return new DriverStatus(randomSessionId(), randomLong(), randomStatus(), randomOperatorStatuses(), randomOperatorStatuses());
+        return new DriverStatus(
+            randomSessionId(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomStatus(),
+            randomOperatorStatuses(),
+            randomOperatorStatuses()
+        );
     }
 
     private String randomSessionId() {
@@ -104,30 +120,25 @@ public class DriverStatusTests extends AbstractWireSerializingTestCase<DriverSta
     @Override
     protected DriverStatus mutateInstance(DriverStatus instance) throws IOException {
         var sessionId = instance.sessionId();
+        long started = instance.started();
         long lastUpdated = instance.lastUpdated();
+        long cpuNanos = instance.cpuNanos();
+        long iterations = instance.iterations();
         var status = instance.status();
         var completedOperators = instance.completedOperators();
         var activeOperators = instance.activeOperators();
-        switch (between(0, 4)) {
-            case 0:
-                sessionId = randomValueOtherThan(sessionId, this::randomSessionId);
-                break;
-            case 1:
-                lastUpdated = randomValueOtherThan(lastUpdated, ESTestCase::randomLong);
-                break;
-            case 2:
-                status = randomValueOtherThan(status, this::randomStatus);
-                break;
-            case 3:
-                completedOperators = randomValueOtherThan(completedOperators, DriverStatusTests::randomOperatorStatuses);
-                break;
-            case 4:
-                activeOperators = randomValueOtherThan(activeOperators, DriverStatusTests::randomOperatorStatuses);
-                break;
-            default:
-                throw new UnsupportedOperationException();
+        switch (between(0, 7)) {
+            case 0 -> sessionId = randomValueOtherThan(sessionId, this::randomSessionId);
+            case 1 -> started = randomValueOtherThan(started, ESTestCase::randomNonNegativeLong);
+            case 2 -> lastUpdated = randomValueOtherThan(lastUpdated, ESTestCase::randomNonNegativeLong);
+            case 3 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong);
+            case 4 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong);
+            case 5 -> status = randomValueOtherThan(status, this::randomStatus);
+            case 6 -> completedOperators = randomValueOtherThan(completedOperators, DriverStatusTests::randomOperatorStatuses);
+            case 7 -> activeOperators = randomValueOtherThan(activeOperators, DriverStatusTests::randomOperatorStatuses);
+            default -> throw new UnsupportedOperationException();
         }
-        return new DriverStatus(sessionId, lastUpdated, status, completedOperators, activeOperators);
+        return new DriverStatus(sessionId, started, lastUpdated, cpuNanos, iterations, status, completedOperators, activeOperators);
     }
 
     @Override

+ 167 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

@@ -35,10 +35,177 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
 
 import static org.hamcrest.Matchers.equalTo;
 
 public class DriverTests extends ESTestCase {
+    /**
+     * Runs a driver to completion in a single call and asserts that the
+     * status and profile returned makes sense.
+     */
+    public void testProfileAndStatusFinishInOneRound() {
+        DriverContext driverContext = driverContext();
+        List<Page> inPages = randomList(1, 100, DriverTests::randomPage);
+        List<Page> outPages = new ArrayList<>();
+
+        long startEpoch = randomNonNegativeLong();
+        long startNanos = randomLong();
+        long waitTime = randomLongBetween(1000, 100000);
+        long tickTime = randomLongBetween(1, 10000);
+
+        Driver driver = new Driver(
+            "unset",
+            startEpoch,
+            startNanos,
+            driverContext,
+            () -> "unset",
+            new CannedSourceOperator(inPages.iterator()),
+            List.of(),
+            new TestResultPageSinkOperator(outPages::add),
+            TimeValue.timeValueDays(10),
+            () -> {}
+        );
+
+        NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
+
+        logger.info("status {}", driver.status());
+        assertThat(driver.status().status(), equalTo(DriverStatus.Status.QUEUED));
+        assertThat(driver.status().started(), equalTo(startEpoch));
+        assertThat(driver.status().cpuNanos(), equalTo(0L));
+        assertThat(driver.status().iterations(), equalTo(0L));
+        driver.run(TimeValue.timeValueSeconds(Long.MAX_VALUE), Integer.MAX_VALUE, nowSupplier);
+        logger.info("status {}", driver.status());
+        assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
+        assertThat(driver.status().started(), equalTo(startEpoch));
+        long sumRunningTime = tickTime * (nowSupplier.callCount - 1);
+        assertThat(driver.status().cpuNanos(), equalTo(sumRunningTime));
+        assertThat(driver.status().iterations(), equalTo((long) inPages.size()));
+
+        logger.info("profile {}", driver.profile());
+        assertThat(driver.profile().tookNanos(), equalTo(waitTime + sumRunningTime));
+        assertThat(driver.profile().cpuNanos(), equalTo(sumRunningTime));
+        assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
+    }
+
+    /**
+     * Runs the driver processing a single page at a time and asserting that
+     * the status reported between each call is sane. And that the profile
+     * returned after completion is sane.
+     */
+    public void testProfileAndStatusOneIterationAtATime() {
+        DriverContext driverContext = driverContext();
+        List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
+        List<Page> outPages = new ArrayList<>();
+
+        long startEpoch = randomNonNegativeLong();
+        long startNanos = randomLong();
+        long waitTime = randomLongBetween(1000, 100000);
+        long tickTime = randomLongBetween(1, 10000);
+
+        Driver driver = new Driver(
+            "unset",
+            startEpoch,
+            startNanos,
+            driverContext,
+            () -> "unset",
+            new CannedSourceOperator(inPages.iterator()),
+            List.of(),
+            new TestResultPageSinkOperator(outPages::add),
+            TimeValue.timeValueDays(10),
+            () -> {}
+        );
+
+        NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
+        for (int i = 0; i < inPages.size(); i++) {
+            logger.info("status {} {}", i, driver.status());
+            assertThat(driver.status().status(), equalTo(i == 0 ? DriverStatus.Status.QUEUED : DriverStatus.Status.WAITING));
+            assertThat(driver.status().started(), equalTo(startEpoch));
+            assertThat(driver.status().iterations(), equalTo((long) i));
+            assertThat(driver.status().cpuNanos(), equalTo(tickTime * i));
+            driver.run(TimeValue.timeValueSeconds(Long.MAX_VALUE), 1, nowSupplier);
+        }
+
+        logger.info("status {}", driver.status());
+        assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
+        assertThat(driver.status().started(), equalTo(startEpoch));
+        assertThat(driver.status().iterations(), equalTo((long) inPages.size()));
+        assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size()));
+
+        logger.info("profile {}", driver.profile());
+        assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1)));
+        assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size()));
+        assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
+    }
+
+    /**
+     * Runs the driver processing a single page at a time via a synthetic timeout
+     * and asserting that the status reported between each call is sane. And that
+     * the profile returned after completion is sane.
+     */
+    public void testProfileAndStatusTimeout() {
+        DriverContext driverContext = driverContext();
+        List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
+        List<Page> outPages = new ArrayList<>();
+
+        long startEpoch = randomNonNegativeLong();
+        long startNanos = randomLong();
+        long waitTime = randomLongBetween(1000, 100000);
+        long tickTime = randomLongBetween(1, 10000);
+
+        Driver driver = new Driver(
+            "unset",
+            startEpoch,
+            startNanos,
+            driverContext,
+            () -> "unset",
+            new CannedSourceOperator(inPages.iterator()),
+            List.of(),
+            new TestResultPageSinkOperator(outPages::add),
+            TimeValue.timeValueNanos(tickTime),
+            () -> {}
+        );
+
+        NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
+        for (int i = 0; i < inPages.size(); i++) {
+            logger.info("status {} {}", i, driver.status());
+            assertThat(driver.status().status(), equalTo(i == 0 ? DriverStatus.Status.QUEUED : DriverStatus.Status.WAITING));
+            assertThat(driver.status().started(), equalTo(startEpoch));
+            assertThat(driver.status().iterations(), equalTo((long) i));
+            assertThat(driver.status().cpuNanos(), equalTo(tickTime * i));
+            driver.run(TimeValue.timeValueNanos(tickTime), Integer.MAX_VALUE, nowSupplier);
+        }
+
+        logger.info("status {}", driver.status());
+        assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
+        assertThat(driver.status().started(), equalTo(startEpoch));
+        assertThat(driver.status().iterations(), equalTo((long) inPages.size()));
+        assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size()));
+
+        logger.info("profile {}", driver.profile());
+        assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1)));
+        assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size()));
+        assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
+    }
+
+    class NowSupplier implements LongSupplier {
+        private final long startNanos;
+        private final long waitTime;
+        private final long tickTime;
+
+        private int callCount;
+
+        NowSupplier(long startNanos, long waitTime, long tickTime) {
+            this.startNanos = startNanos;
+            this.waitTime = waitTime;
+            this.tickTime = tickTime;
+        }
+
+        @Override
+        public long getAsLong() {
+            return startNanos + waitTime + tickTime * callCount++;
+        }
+    }
 
     public void testThreadContext() throws Exception {
         DriverContext driverContext = driverContext();

+ 60 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorStatusTests.java

@@ -0,0 +1,60 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class HashAggregationOperatorStatusTests extends AbstractWireSerializingTestCase<HashAggregationOperator.Status> {
+    public static HashAggregationOperator.Status simple() {
+        return new HashAggregationOperator.Status(500012, 200012, 123);
+    }
+
+    public static String simpleToJson() {
+        return """
+            {
+              "hash_nanos" : 500012,
+              "hash_time" : "500micros",
+              "aggregation_nanos" : 200012,
+              "aggregation_time" : "200micros",
+              "pages_processed" : 123
+            }""";
+    }
+
+    public void testToXContent() {
+        assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
+    }
+
+    @Override
+    protected Writeable.Reader<HashAggregationOperator.Status> instanceReader() {
+        return HashAggregationOperator.Status::new;
+    }
+
+    @Override
+    public HashAggregationOperator.Status createTestInstance() {
+        return new HashAggregationOperator.Status(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeInt());
+    }
+
+    @Override
+    protected HashAggregationOperator.Status mutateInstance(HashAggregationOperator.Status instance) {
+        long hashNanos = instance.hashNanos();
+        long aggregationNanos = instance.aggregationNanos();
+        int pagesProcessed = instance.pagesProcessed();
+        switch (between(0, 2)) {
+            case 0 -> hashNanos = randomValueOtherThan(hashNanos, ESTestCase::randomNonNegativeLong);
+            case 1 -> aggregationNanos = randomValueOtherThan(aggregationNanos, ESTestCase::randomNonNegativeLong);
+            case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
+            default -> throw new UnsupportedOperationException();
+        }
+        return new HashAggregationOperator.Status(hashNanos, aggregationNanos, pagesProcessed);
+    }
+}

+ 2 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java

@@ -284,6 +284,8 @@ public abstract class OperatorTestCase extends AnyOperatorTestCase {
             drivers.add(
                 new Driver(
                     "dummy-session",
+                    0,
+                    0,
                     new DriverContext(BigArrays.NON_RECYCLING_INSTANCE, TestBlockFactory.getNonBreakingInstance()),
                     () -> "dummy-driver",
                     new SequenceLongBlockSourceOperator(

+ 4 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

@@ -275,6 +275,8 @@ public class ExchangeServiceTests extends ESTestCase {
             DriverContext dc = driverContext();
             Driver d = new Driver(
                 "test-session:1",
+                0,
+                0,
                 dc,
                 () -> description,
                 seqNoGenerator.get(dc),
@@ -291,6 +293,8 @@ public class ExchangeServiceTests extends ESTestCase {
             DriverContext dc = driverContext();
             Driver d = new Driver(
                 "test-session:2",
+                0,
+                0,
                 dc,
                 () -> description,
                 sourceOperator,

+ 1 - 1
x-pack/plugin/esql/qa/server/multi-node/src/yamlRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/EsqlClientYamlIT.java

@@ -5,7 +5,7 @@
  * 2.0.
  */
 
-package org.elasticsearch.xpack.esql.qa.mixed;
+package org.elasticsearch.xpack.esql.qa.multi_node;
 
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

@@ -332,6 +332,8 @@ public class EnrichLookupService {
             OutputOperator outputOperator = new OutputOperator(List.of(), Function.identity(), result::set);
             Driver driver = new Driver(
                 "enrich-lookup:" + sessionId,
+                System.currentTimeMillis(),
+                System.nanoTime(),
                 driverContext,
                 () -> lookupDescription(sessionId, shardId, matchType, matchField, extractFields, inputPage.getPositionCount()),
                 queryOperator,

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

@@ -716,6 +716,8 @@ public class LocalExecutionPlanner {
                 success = true;
                 return new Driver(
                     sessionId,
+                    System.currentTimeMillis(),
+                    System.nanoTime(),
                     driverContext,
                     physicalOperation::describe,
                     source,

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -426,7 +426,7 @@ public class ComputeService {
         }
         ActionListener<Void> listenerCollectingStatus = listener.map(ignored -> {
             if (context.configuration.profile()) {
-                return drivers.stream().map(d -> new DriverProfile(d.status().completedOperators())).toList();
+                return drivers.stream().map(Driver::profile).toList();
             }
             return null;
         });

+ 4 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -25,7 +25,9 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.lucene.LuceneOperator;
 import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
+import org.elasticsearch.compute.operator.AggregationOperator;
 import org.elasticsearch.compute.operator.DriverStatus;
+import org.elasticsearch.compute.operator.HashAggregationOperator;
 import org.elasticsearch.compute.operator.LimitOperator;
 import org.elasticsearch.compute.operator.MvExpandOperator;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
@@ -163,8 +165,10 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
             List.of(
                 DriverStatus.ENTRY,
                 AbstractPageMappingOperator.Status.ENTRY,
+                AggregationOperator.Status.ENTRY,
                 ExchangeSinkOperator.Status.ENTRY,
                 ExchangeSourceOperator.Status.ENTRY,
+                HashAggregationOperator.Status.ENTRY,
                 LimitOperator.Status.ENTRY,
                 LuceneOperator.Status.ENTRY,
                 TopNOperatorStatus.ENTRY,

+ 9 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java

@@ -47,12 +47,19 @@ public class EsqlQueryResponseProfileTests extends AbstractWireSerializingTestCa
     }
 
     private DriverProfile randomDriverProfile() {
-        return new DriverProfile(randomList(10, this::randomOperatorStatus));
+        return new DriverProfile(
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomList(10, this::randomOperatorStatus)
+        );
     }
 
     private DriverStatus.OperatorStatus randomOperatorStatus() {
         String name = randomAlphaOfLength(4);
-        Operator.Status status = randomBoolean() ? null : new AbstractPageMappingOperator.Status(between(0, Integer.MAX_VALUE));
+        Operator.Status status = randomBoolean()
+            ? null
+            : new AbstractPageMappingOperator.Status(randomNonNegativeLong(), between(0, Integer.MAX_VALUE));
         return new DriverStatus.OperatorStatus(name, status);
     }
 }

+ 43 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java

@@ -458,15 +458,54 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 List.of(new ColumnInfo("foo", "integer")),
                 List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())),
                 new EsqlQueryResponse.Profile(
-                    List.of(new DriverProfile(List.of(new DriverStatus.OperatorStatus("asdf", new AbstractPageMappingOperator.Status(10)))))
+                    List.of(
+                        new DriverProfile(
+                            20021,
+                            20000,
+                            12,
+                            List.of(new DriverStatus.OperatorStatus("asdf", new AbstractPageMappingOperator.Status(10021, 10)))
+                        )
+                    )
                 ),
                 false,
                 false
             );
         ) {
-            assertThat(Strings.toString(response), equalTo("""
-                {"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]],"profile":{"drivers":[""" + """
-                {"operators":[{"operator":"asdf","status":{"pages_processed":10}}]}]}}"""));
+            assertThat(Strings.toString(response, true, false), equalTo("""
+                {
+                  "columns" : [
+                    {
+                      "name" : "foo",
+                      "type" : "integer"
+                    }
+                  ],
+                  "values" : [
+                    [
+                      40
+                    ],
+                    [
+                      80
+                    ]
+                  ],
+                  "profile" : {
+                    "drivers" : [
+                      {
+                        "took_nanos" : 20021,
+                        "cpu_nanos" : 20000,
+                        "iterations" : 12,
+                        "operators" : [
+                          {
+                            "operator" : "asdf",
+                            "status" : {
+                              "process_nanos" : 10021,
+                              "pages_processed" : 10
+                            }
+                          }
+                        ]
+                      }
+                    ]
+                  }
+                }"""));
         }
     }
 

+ 143 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml

@@ -0,0 +1,143 @@
+---
+setup:
+  - skip:
+      version: " - 8.11.99"
+      reason: "profile option added in 8.12"
+      features: warnings
+  - do:
+      indices.create:
+        index:  test
+        body:
+          settings:
+            number_of_shards: 1
+          mappings:
+            properties:
+              data:
+                type: long
+              data_d:
+                type: double
+              count:
+                type: long
+              count_d:
+                type: double
+              time:
+                type: long
+              color:
+                type: keyword
+              text:
+                type: text
+
+  - do:
+      cluster.health: # older versions of ESQL don't wait for the nodes to become available.
+        wait_for_no_initializing_shards: true
+        wait_for_events: languid
+
+  - do:
+      bulk:
+        index: "test"
+        refresh: true
+        body:
+          - { "index": { } }
+          - { "data": 1, "count": 40, "data_d": 1, "count_d": 40, "time": 1674835275187, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 2, "count": 42, "data_d": 2, "count_d": 42, "time": 1674835275188, "color": "blue", "text": "bb blue" }
+          - { "index": { } }
+          - { "data": 1, "count": 44, "data_d": 1, "count_d": 44, "time": 1674835275189, "color": "green", "text": "gg green" }
+          - { "index": { } }
+          - { "data": 2, "count": 46, "data_d": 2, "count_d": 46, "time": 1674835275190, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 1, "count": 40, "data_d": 1, "count_d": 40, "time": 1674835275191, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 2, "count": 42, "data_d": 2, "count_d": 42, "time": 1674835275192, "color": "blue", "text": "bb blue" }
+          - { "index": { } }
+          - { "data": 1, "count": 44, "data_d": 1, "count_d": 44, "time": 1674835275193, "color": "green", "text": "gg green" }
+          - { "index": { } }
+          - { "data": 2, "count": 46, "data_d": 2, "count_d": 46, "time": 1674835275194, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 1, "count": 40, "data_d": 1, "count_d": 40, "time": 1674835275195, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 2, "count": 42, "data_d": 2, "count_d": 42, "time": 1674835275196, "color": "blue", "text": "bb blue" }
+          - { "index": { } }
+          - { "data": 1, "count": 44, "data_d": 1, "count_d": 44, "time": 1674835275197, "color": "green", "text": "gg green" }
+          - { "index": { } }
+          - { "data": 2, "count": 46, "data_d": 2, "count_d": 46, "time": 1674835275198, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 1, "count": 40, "data_d": 1, "count_d": 40, "time": 1674835275199, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 2, "count": 42, "data_d": 2, "count_d": 42, "time": 1674835275200, "color": "blue", "text": "bb blue" }
+          - { "index": { } }
+          - { "data": 1, "count": 44, "data_d": 1, "count_d": 44, "time": 1674835275201, "color": "green", "text": "gg green" }
+          - { "index": { } }
+          - { "data": 2, "count": 46, "data_d": 2, "count_d": 46, "time": 1674835275202, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 1, "count": 40, "data_d": 1, "count_d": 40, "time": 1674835275203, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 2, "count": 42, "data_d": 2, "count_d": 42, "time": 1674835275204, "color": "blue", "text": "bb blue" }
+          - { "index": { } }
+          - { "data": 1, "count": 44, "data_d": 1, "count_d": 44, "time": 1674835275205, "color": "green", "text": "gg green" }
+          - { "index": { } }
+          - { "data": 2, "count": 46, "data_d": 2, "count_d": 46, "time": 1674835275206, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 1, "count": 40, "data_d": 1, "count_d": 40, "time": 1674835275207, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 2, "count": 42, "data_d": 2, "count_d": 42, "time": 1674835275208, "color": "blue", "text": "bb blue" }
+          - { "index": { } }
+          - { "data": 1, "count": 44, "data_d": 1, "count_d": 44, "time": 1674835275209, "color": "green", "text": "gg green" }
+          - { "index": { } }
+          - { "data": 2, "count": 46, "data_d": 2, "count_d": 46, "time": 1674835275210, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 1, "count": 40, "data_d": 1, "count_d": 40, "time": 1674835275211, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 2, "count": 42, "data_d": 2, "count_d": 42, "time": 1674835275212, "color": "blue", "text": "bb blue" }
+          - { "index": { } }
+          - { "data": 1, "count": 44, "data_d": 1, "count_d": 44, "time": 1674835275213, "color": "green", "text": "gg green" }
+          - { "index": { } }
+          - { "data": 2, "count": 46, "data_d": 2, "count_d": 46, "time": 1674835275214, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 1, "count": 40, "data_d": 1, "count_d": 40, "time": 1674835275215, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 2, "count": 42, "data_d": 2, "count_d": 42, "time": 1674835275216, "color": "blue", "text": "bb blue" }
+          - { "index": { } }
+          - { "data": 1, "count": 44, "data_d": 1, "count_d": 44, "time": 1674835275217, "color": "green", "text": "gg green" }
+          - { "index": { } }
+          - { "data": 2, "count": 46, "data_d": 2, "count_d": 46, "time": 1674835275218, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 1, "count": 40, "data_d": 1, "count_d": 40, "time": 1674835275219, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 2, "count": 42, "data_d": 2, "count_d": 42, "time": 1674835275220, "color": "blue", "text": "bb blue" }
+          - { "index": { } }
+          - { "data": 1, "count": 44, "data_d": 1, "count_d": 44, "time": 1674835275221, "color": "green", "text": "gg green" }
+          - { "index": { } }
+          - { "data": 2, "count": 46, "data_d": 2, "count_d": 46, "time": 1674835275222, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 1, "count": 40, "data_d": 1, "count_d": 40, "time": 1674835275223, "color": "red", "text": "rr red" }
+          - { "index": { } }
+          - { "data": 2, "count": 42, "data_d": 2, "count_d": 42, "time": 1674835275224, "color": "blue", "text": "bb blue" }
+          - { "index": { } }
+          - { "data": 1, "count": 44, "data_d": 1, "count_d": 44, "time": 1674835275225, "color": "green", "text": "gg green" }
+          - { "index": { } }
+          - { "data": 2, "count": 46, "data_d": 2, "count_d": 46, "time": 1674835275226, "color": "red", "text": "rr red" }
+
+---
+avg 8.14 or after:
+  - skip:
+      features: ["node_selector"]
+
+  - do:
+      node_selector:
+        version: "8.13.99 - "
+      esql.query:
+        body:
+          query: 'FROM test | STATS AVG(data) | LIMIT 1'
+          columnar: true
+          profile: true
+
+  - match: {columns.0.name: "AVG(data)"}
+  - match: {columns.0.type: "double"}
+  - match: {values.0.0: 1.5}
+  - match: {profile.drivers.0.operators.0.operator: /ExchangeSourceOperator|LuceneSourceOperator.+/}
+  - gte: {profile.drivers.0.took_nanos: 0}
+  - gte: {profile.drivers.0.cpu_nanos: 0}
+  - gte: {profile.drivers.1.took_nanos: 0}
+  - gte: {profile.drivers.1.cpu_nanos: 0}
+# It's hard to assert much about these because they don't come back in any particular order.