Browse Source

ESQL: Add `documents_found` and `values_loaded` (#125631) (#130029)

This adds `documents_found` and `values_loaded` to the to the ESQL response:
```json
{
  "took" : 194,
  "is_partial" : false,
  "documents_found" : 100000,
  "values_loaded" : 200000,
  "columns" : [
    { "name" : "a", "type" : "long" },
    { "name" : "b", "type" : "long" }
  ],
  "values" : [[10, 1]]
}
```

These are cheap enough to collect that we can do it for every query and
return it with every response. It's small, but it still gives you a
reasonable sense of how much work Elasticsearch had to go through to
perform the query.

I've also added these two fields to the driver profile and task status:
```json
    "drivers" : [
      {
        "description" : "data",
        "cluster_name" : "runTask",
        "node_name" : "runTask-0",
        "start_millis" : 1742923173077,
        "stop_millis" : 1742923173087,
        "took_nanos" : 9557014,
        "cpu_nanos" : 9091340,
        "documents_found" : 5,   <---- THESE
        "values_loaded" : 15,    <---- THESE
        "iterations" : 6,
...
```

These are at a high level and should be easy to reason about. We'd like to
extract this into a "show me how difficult this running query is" API one
day. But today, just plumbing it into the debugging output is good.

Any `Operator` can claim to "find documents" or "load values" by overriding
a method on its `Operator.Status` implementation:
```java
/**
 * The number of documents found by this operator. Most operators
 * don't find documents and will return {@code 0} here.
 */
default long documentsFound() {
    return 0;
}

/**
 * The number of values loaded by this operator. Most operators
 * don't load values and will return {@code 0} here.
 */
default long valuesLoaded() {
    return 0;
}
```

In this PR all of the `LuceneOperator`s declare that each `position` they
emit is a "document found" and the `ValuesSourceValuesSourceReaderOperator`
says each value it makes is a "value loaded". That's pretty pretty much
true. The `LuceneCountOperator` and `LuceneMinMaxOperator` sort of pretend
that the count/min/max that they emit is a "document" - but that's good
enough to give you a sense of what's going on. It's *like* document.
Nik Everett 3 months ago
parent
commit
0acda3a659
57 changed files with 1168 additions and 359 deletions
  1. 5 0
      docs/changelog/125631.yaml
  2. 2 0
      docs/reference/esql/esql-rest.asciidoc
  3. 5 0
      docs/reference/esql/functions/description/knn.asciidoc
  4. 21 0
      docs/reference/esql/functions/examples/knn.asciidoc
  5. 13 0
      docs/reference/esql/functions/functionNamedParams/knn.asciidoc
  6. 9 0
      docs/reference/esql/functions/functionNamedParams/to_ip.asciidoc
  7. 18 0
      docs/reference/esql/functions/layout/knn.asciidoc
  8. 12 0
      docs/reference/esql/functions/parameters/knn.asciidoc
  9. 1 0
      docs/reference/esql/functions/signature/knn.svg
  10. 9 0
      docs/reference/esql/functions/types/knn.asciidoc
  11. 15 1
      docs/reference/esql/multivalued-fields.asciidoc
  12. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  13. 1 1
      server/src/main/java/org/elasticsearch/common/Strings.java
  14. 7 2
      test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
  15. 5 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java
  16. 5 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java
  17. 30 7
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java
  18. 3 3
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java
  19. 117 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java
  20. 6 4
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java
  21. 23 64
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java
  22. 17 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java
  23. 76 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OperatorStatus.java
  24. 52 8
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java
  25. 7 3
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java
  26. 4 2
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java
  27. 8 6
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java
  28. 5 1
      x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java
  29. 5 2
      x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java
  30. 1 2
      x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java
  31. 11 3
      x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java
  32. 2 2
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java
  33. 16 1
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java
  34. 6 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
  35. 45 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java
  36. 2 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java
  37. 7 7
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java
  38. 7 14
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java
  39. 21 18
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java
  40. 26 16
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
  41. 8 8
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java
  42. 28 17
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java
  43. 9 9
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java
  44. 17 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
  45. 2 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java
  46. 9 7
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
  47. 8 6
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java
  48. 5 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java
  49. 3 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java
  50. 286 78
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java
  51. 13 4
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java
  52. 6 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java
  53. 33 19
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java
  54. 31 24
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java
  55. 2 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java
  56. 56 2
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml
  57. 26 4
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml

+ 5 - 0
docs/changelog/125631.yaml

@@ -0,0 +1,5 @@
+pr: 125631
+summary: Add `documents_found` and `values_loaded`
+area: ES|QL
+type: enhancement
+issues: []

+ 2 - 0
docs/reference/esql/esql-rest.asciidoc

@@ -194,6 +194,8 @@ Which returns:
 {
   "took": 28,
   "is_partial": false,
+  "documents_found": 5,
+  "values_loaded": 20,
   "columns": [
     {"name": "author", "type": "text"},
     {"name": "name", "type": "text"},

+ 5 - 0
docs/reference/esql/functions/description/knn.asciidoc

@@ -0,0 +1,5 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+*Description*
+
+Finds the k nearest vectors to a query vector, as measured by a similarity metric. knn function finds nearest vectors through approximate search on indexed dense_vectors.

+ 21 - 0
docs/reference/esql/functions/examples/knn.asciidoc

@@ -0,0 +1,21 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+*Examples*
+
+[source.merge.styled,esql]
+----
+include::{esql-specs}/knn-function.csv-spec[tag=knn-function]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/knn-function.csv-spec[tag=knn-function-result]
+|===
+[source.merge.styled,esql]
+----
+include::{esql-specs}/knn-function.csv-spec[tag=knn-function-options]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/knn-function.csv-spec[tag=knn-function-options-result]
+|===
+

+ 13 - 0
docs/reference/esql/functions/functionNamedParams/knn.asciidoc

@@ -0,0 +1,13 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+*Supported function named parameters*
+
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+name | types | description
+num_candidates | [integer] | The number of nearest neighbor candidates to consider per shard while doing knn search. Cannot exceed 10,000. Increasing num_candidates tends to improve the accuracy of the final results. Defaults to 1.5 * k
+boost | [float] | Floating point number used to decrease or increase the relevance scores of the query.Defaults to 1.0.
+k | [integer] | The number of nearest neighbors to return from each shard. Elasticsearch collects k results from each shard, then merges them to find the global top results. This value must be less than or equal to num_candidates. Defaults to 10.
+rescore_oversample | [double] | Applies the specified oversampling for rescoring quantized vectors. See [oversampling and rescoring quantized vectors](docs-content://solutions/search/vector/knn.md#dense-vector-knn-search-rescoring) for details.
+similarity | [double] | The minimum similarity required for a document to be considered a match. The similarity value calculated relates to the raw similarity used, not the document score.
+|===

+ 9 - 0
docs/reference/esql/functions/functionNamedParams/to_ip.asciidoc

@@ -0,0 +1,9 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+*Supported function named parameters*
+
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+name | types | description
+leading_zeros | [keyword] | What to do with leading 0s in IPv4 addresses.
+|===

+ 18 - 0
docs/reference/esql/functions/layout/knn.asciidoc

@@ -0,0 +1,18 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+[discrete]
+[[esql-knn]]
+=== `KNN`
+
+preview::["Do not use on production environments. This functionality is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features."]
+
+*Syntax*
+
+[.text-center]
+image::esql/functions/signature/knn.svg[Embedded,opts=inline]
+
+include::../parameters/knn.asciidoc[]
+include::../description/knn.asciidoc[]
+include::../types/knn.asciidoc[]
+include::../functionNamedParams/knn.asciidoc[]
+include::../examples/knn.asciidoc[]

+ 12 - 0
docs/reference/esql/functions/parameters/knn.asciidoc

@@ -0,0 +1,12 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+*Parameters*
+
+`field`::
+Field that the query will target.
+
+`query`::
+Vector value to find top nearest neighbours for.
+
+`options`::
+(Optional) kNN additional options as <<esql-function-named-params,function named parameters>>. See <<query-dsl-knn-query,knn query>> for more information.

+ 1 - 0
docs/reference/esql/functions/signature/knn.svg

@@ -0,0 +1 @@
+<svg version="1.1" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg" width="528" height="46" viewbox="0 0 528 46"><defs><style type="text/css">#guide .c{fill:none;stroke:#222222;}#guide .k{fill:#000000;font-family:Roboto Mono,Sans-serif;font-size:20px;}#guide .s{fill:#e4f4ff;stroke:#222222;}#guide .syn{fill:#8D8D8D;font-family:Roboto Mono,Sans-serif;font-size:20px;}</style></defs><path class="c" d="M0 31h5m56 0h10m32 0h10m80 0h10m32 0h10m80 0h10m32 0h10m104 0h10m32 0h5"/><rect class="s" x="5" y="5" width="56" height="36"/><text class="k" x="15" y="31">KNN</text><rect class="s" x="71" y="5" width="32" height="36" rx="7"/><text class="syn" x="81" y="31">(</text><rect class="s" x="113" y="5" width="80" height="36" rx="7"/><text class="k" x="123" y="31">field</text><rect class="s" x="203" y="5" width="32" height="36" rx="7"/><text class="syn" x="213" y="31">,</text><rect class="s" x="245" y="5" width="80" height="36" rx="7"/><text class="k" x="255" y="31">query</text><rect class="s" x="335" y="5" width="32" height="36" rx="7"/><text class="syn" x="345" y="31">,</text><rect class="s" x="377" y="5" width="104" height="36" rx="7"/><text class="k" x="387" y="31">options</text><rect class="s" x="491" y="5" width="32" height="36" rx="7"/><text class="syn" x="501" y="31">)</text></svg>

+ 9 - 0
docs/reference/esql/functions/types/knn.asciidoc

@@ -0,0 +1,9 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+*Supported types*
+
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+field | query | options | result
+boolean
+|===

+ 15 - 1
docs/reference/esql/multivalued-fields.asciidoc

@@ -28,6 +28,8 @@ Multivalued fields come back as a JSON array:
 {
   "took": 28,
   "is_partial": false,
+  "documents_found": 2,
+  "values_loaded": 5,
   "columns": [
     { "name": "a", "type": "long"},
     { "name": "b", "type": "long"}
@@ -80,6 +82,8 @@ And {esql} sees that removal:
 {
   "took": 28,
   "is_partial": false,
+  "documents_found": 2,
+  "values_loaded": 5,
   "columns": [
     { "name": "a", "type": "long"},
     { "name": "b", "type": "keyword"}
@@ -125,6 +129,8 @@ And {esql} also sees that:
 {
   "took": 28,
   "is_partial": false,
+  "documents_found": 2,
+  "values_loaded": 7,
   "columns": [
     { "name": "a", "type": "long"},
     { "name": "b", "type": "long"}
@@ -169,6 +175,8 @@ POST /_query
 {
   "took": 28,
   "is_partial": false,
+  "documents_found": 2,
+  "values_loaded": 7,
   "columns": [
     { "name": "a", "type": "long"},
     { "name": "b", "type": "keyword"}
@@ -203,6 +211,8 @@ POST /_query
 {
   "took": 28,
   "is_partial": false,
+  "documents_found": 1,
+  "values_loaded": 2,
   "columns": [
     { "name": "a", "type": "long"},
   ],
@@ -247,6 +257,8 @@ POST /_query
 {
   "took": 28,
   "is_partial": false,
+  "documents_found": 2,
+  "values_loaded": 5,
   "columns": [
     { "name": "a",   "type": "long"},
     { "name": "b",   "type": "long"},
@@ -271,7 +283,7 @@ Work around this limitation by converting the field to single value with one of:
 * <<esql-mv_min>>
 * <<esql-mv_sum>>
 
-[source,console,esql-multivalued-fields-mv-into-null]
+[source,console,esql-multivalued-fields-mv-min]
 ----
 POST /_query
 {
@@ -285,6 +297,8 @@ POST /_query
 {
   "took": 28,
   "is_partial": false,
+  "documents_found": 2,
+  "values_loaded": 5,
   "columns": [
     { "name": "a",   "type": "long"},
     { "name": "b",   "type": "long"},

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -253,6 +253,7 @@ public class TransportVersions {
     public static final TransportVersion SPARSE_VECTOR_FIELD_PRUNING_OPTIONS_8_19 = def(8_841_0_58);
     public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED_8_19 = def(8_841_0_59);
     public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60);
+    public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 1 - 1
server/src/main/java/org/elasticsearch/common/Strings.java

@@ -818,7 +818,7 @@ public class Strings {
      * Allows to configure the params.
      * Allows to control whether the outputted json needs to be pretty printed and human readable.
      */
-    private static String toString(ToXContent toXContent, ToXContent.Params params, boolean pretty, boolean human) {
+    public static String toString(ToXContent toXContent, ToXContent.Params params, boolean pretty, boolean human) {
         try {
             XContentBuilder builder = createBuilder(pretty, human);
             if (toXContent.isFragment()) {

+ 7 - 2
test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

@@ -2704,8 +2704,13 @@ public abstract class ESRestTestCase extends ESTestCase {
             .entry("drivers", instanceOf(List.class));
     }
 
-    protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial) {
+    protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial, boolean includeDocumentsFound) {
         MapMatcher mapMatcher = matchesMap();
+        if (includeDocumentsFound) {
+            // Older versions may not return documents_found and values_loaded.
+            mapMatcher = mapMatcher.entry("documents_found", greaterThanOrEqualTo(0));
+            mapMatcher = mapMatcher.entry("values_loaded", greaterThanOrEqualTo(0));
+        }
         if (includeMetadata) {
             mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
         }
@@ -2720,7 +2725,7 @@ public abstract class ESRestTestCase extends ESTestCase {
      * Create empty result matcher from result, taking into account all metadata items.
      */
     protected static MapMatcher getResultMatcher(Map<String, Object> result) {
-        return getResultMatcher(result.containsKey("took"), result.containsKey("is_partial"));
+        return getResultMatcher(result.containsKey("took"), result.containsKey("is_partial"), result.containsKey("documents_found"));
     }
 
     /**

+ 5 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java

@@ -83,7 +83,11 @@ public final class CompositeBlock extends AbstractNonThreadSafeRefCounted implem
 
     @Override
     public int getTotalValueCount() {
-        throw new UnsupportedOperationException("Composite block");
+        int totalValueCount = 0;
+        for (Block b : blocks) {
+            totalValueCount += b.getTotalValueCount();
+        }
+        return totalValueCount;
     }
 
     @Override

+ 5 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

@@ -437,6 +437,11 @@ public abstract class LuceneOperator extends SourceOperator {
             return partitioningStrategies;
         }
 
+        @Override
+        public long documentsFound() {
+            return rowsEmitted;
+        }
+
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();

+ 30 - 7
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

@@ -46,6 +46,8 @@ import java.util.TreeMap;
 import java.util.function.IntFunction;
 import java.util.function.Supplier;
 
+import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
+
 /**
  * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator}
  * and outputs them to a new column.
@@ -112,6 +114,7 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
     private final BlockFactory blockFactory;
 
     private final Map<String, Integer> readersBuilt = new TreeMap<>();
+    private long valuesLoaded;
 
     int lastShard = -1;
     int lastSegment = -1;
@@ -158,6 +161,9 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
                 }
             }
             success = true;
+            for (Block b : blocks) {
+                valuesLoaded += b.getTotalValueCount();
+            }
             return page.appendBlocks(blocks);
         } catch (IOException e) {
             throw new UncheckedIOException(e);
@@ -548,7 +554,7 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
 
     @Override
     protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
-        return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted);
+        return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded);
     }
 
     /**
@@ -593,21 +599,34 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
         );
 
         private final Map<String, Integer> readersBuilt;
-
-        Status(Map<String, Integer> readersBuilt, long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
+        private final long valuesLoaded;
+
+        Status(
+            Map<String, Integer> readersBuilt,
+            long processNanos,
+            int pagesProcessed,
+            long rowsReceived,
+            long rowsEmitted,
+            long valuesLoaded
+        ) {
             super(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
             this.readersBuilt = readersBuilt;
+            this.valuesLoaded = valuesLoaded;
         }
 
         Status(StreamInput in) throws IOException {
             super(in);
             readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt);
+            valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19) ? in.readVLong() : 0;
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             out.writeMap(readersBuilt, StreamOutput::writeVInt);
+            if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) {
+                out.writeVLong(valuesLoaded);
+            }
         }
 
         @Override
@@ -619,6 +638,11 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
             return readersBuilt;
         }
 
+        @Override
+        public long valuesLoaded() {
+            return valuesLoaded;
+        }
+
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();
@@ -627,6 +651,7 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
                 builder.field(e.getKey(), e.getValue());
             }
             builder.endObject();
+            builder.field("values_loaded", valuesLoaded);
             innerToXContent(builder);
             return builder.endObject();
         }
@@ -635,12 +660,12 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
         public boolean equals(Object o) {
             if (super.equals(o) == false) return false;
             Status status = (Status) o;
-            return readersBuilt.equals(status.readersBuilt);
+            return readersBuilt.equals(status.readersBuilt) && valuesLoaded == status.valuesLoaded;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(super.hashCode(), readersBuilt);
+            return Objects.hash(super.hashCode(), readersBuilt, valuesLoaded);
         }
 
         @Override
@@ -750,6 +775,4 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
             return factory.newAggregateMetricDoubleBlockBuilder(count);
         }
     }
-
-    // TODO tests that mix source loaded fields and doc values in the same block
 }

+ 3 - 3
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

@@ -77,7 +77,7 @@ public class Driver implements Releasable, Describable {
     private final DriverContext driverContext;
     private final Supplier<String> description;
     private final List<Operator> activeOperators;
-    private final List<DriverStatus.OperatorStatus> statusOfCompletedOperators = new ArrayList<>();
+    private final List<OperatorStatus> statusOfCompletedOperators = new ArrayList<>();
     private final Releasable releasable;
     private final long statusNanos;
 
@@ -343,7 +343,7 @@ public class Driver implements Releasable, Describable {
                 Iterator<Operator> itr = finishedOperators.iterator();
                 while (itr.hasNext()) {
                     Operator op = itr.next();
-                    statusOfCompletedOperators.add(new DriverStatus.OperatorStatus(op.toString(), op.status()));
+                    statusOfCompletedOperators.add(new OperatorStatus(op.toString(), op.status()));
                     op.close();
                     itr.remove();
                 }
@@ -570,7 +570,7 @@ public class Driver implements Releasable, Describable {
                 prev.iterations() + extraIterations,
                 status,
                 statusOfCompletedOperators,
-                activeOperators.stream().map(op -> new DriverStatus.OperatorStatus(op.toString(), op.status())).toList(),
+                activeOperators.stream().map(op -> new OperatorStatus(op.toString(), op.status())).toList(),
                 sleeps
             );
         });

+ 117 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java

@@ -0,0 +1,117 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Information returned when one of more {@link Driver}s is completed.
+ * @param documentsFound The number of documents found by all lucene queries performed by these drivers.
+ * @param valuesLoaded The number of values loaded from lucene for all drivers. This is
+ *                     <strong>roughly</strong> the number of documents times the number of
+ *                     fields per document. Except {@code null} values don't count.
+ *                     And multivalued fields count as many times as there are values.
+ * @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
+ *                          not free so this will be empty if the {@code profile} option was not set in
+ *                          the request.
+ */
+public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List<DriverProfile> collectedProfiles) implements Writeable {
+
+    /**
+     * Completion info we use when we didn't properly complete any drivers.
+     * Usually this is returned with an error, but it's also used when receiving
+     * responses from very old nodes.
+     */
+    public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of());
+
+    /**
+     * Build a {@link DriverCompletionInfo} for many drivers including their profile output.
+     */
+    public static DriverCompletionInfo includingProfiles(List<Driver> drivers) {
+        long documentsFound = 0;
+        long valuesLoaded = 0;
+        List<DriverProfile> collectedProfiles = new ArrayList<>(drivers.size());
+        for (Driver d : drivers) {
+            DriverProfile p = d.profile();
+            for (OperatorStatus o : p.operators()) {
+                documentsFound += o.documentsFound();
+                valuesLoaded += o.valuesLoaded();
+            }
+            collectedProfiles.add(p);
+        }
+        return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
+    }
+
+    /**
+     * Build a {@link DriverCompletionInfo} for many drivers excluding their profile output.
+     */
+    public static DriverCompletionInfo excludingProfiles(List<Driver> drivers) {
+        long documentsFound = 0;
+        long valuesLoaded = 0;
+        for (Driver d : drivers) {
+            DriverStatus s = d.status();
+            assert s.status() == DriverStatus.Status.DONE;
+            for (OperatorStatus o : s.completedOperators()) {
+                documentsFound += o.documentsFound();
+                valuesLoaded += o.valuesLoaded();
+            }
+        }
+        return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of());
+    }
+
+    public DriverCompletionInfo(StreamInput in) throws IOException {
+        this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::new));
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeVLong(documentsFound);
+        out.writeVLong(valuesLoaded);
+        out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o));
+    }
+
+    public static class Accumulator {
+        private long documentsFound;
+        private long valuesLoaded;
+        private final List<DriverProfile> collectedProfiles = new ArrayList<>();
+
+        public void accumulate(DriverCompletionInfo info) {
+            this.documentsFound += info.documentsFound;
+            this.valuesLoaded += info.valuesLoaded;
+            this.collectedProfiles.addAll(info.collectedProfiles);
+        }
+
+        public DriverCompletionInfo finish() {
+            return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
+        }
+    }
+
+    public static class AtomicAccumulator {
+        private final AtomicLong documentsFound = new AtomicLong();
+        private final AtomicLong valuesLoaded = new AtomicLong();
+        private final List<DriverProfile> collectedProfiles = Collections.synchronizedList(new ArrayList<>());
+
+        public void accumulate(DriverCompletionInfo info) {
+            this.documentsFound.addAndGet(info.documentsFound);
+            this.valuesLoaded.addAndGet(info.valuesLoaded);
+            this.collectedProfiles.addAll(info.collectedProfiles);
+        }
+
+        public DriverCompletionInfo finish() {
+            return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles);
+        }
+    }
+}

+ 6 - 4
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java

@@ -63,7 +63,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
     /**
      * Status of each {@link Operator} in the driver when it finished.
      */
-    private final List<DriverStatus.OperatorStatus> operators;
+    private final List<OperatorStatus> operators;
 
     private final DriverSleeps sleeps;
 
@@ -74,7 +74,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
         long tookNanos,
         long cpuNanos,
         long iterations,
-        List<DriverStatus.OperatorStatus> operators,
+        List<OperatorStatus> operators,
         DriverSleeps sleeps
     ) {
         this.taskDescription = taskDescription;
@@ -107,7 +107,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
             this.cpuNanos = 0;
             this.iterations = 0;
         }
-        this.operators = in.readCollectionAsImmutableList(DriverStatus.OperatorStatus::new);
+        this.operators = in.readCollectionAsImmutableList(OperatorStatus::readFrom);
         this.sleeps = DriverSleeps.read(in);
     }
 
@@ -176,7 +176,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
     /**
      * Status of each {@link Operator} in the driver when it finished.
      */
-    public List<DriverStatus.OperatorStatus> operators() {
+    public List<OperatorStatus> operators() {
         return operators;
     }
 
@@ -202,6 +202,8 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
                 if (b.humanReadable()) {
                     b.field("cpu_time", TimeValue.timeValueNanos(cpuNanos));
                 }
+                b.field("documents_found", operators.stream().mapToLong(OperatorStatus::documentsFound).sum());
+                b.field("values_loaded", operators.stream().mapToLong(OperatorStatus::valuesLoaded).sum());
                 b.field("iterations", iterations);
                 return b;
             });

+ 23 - 64
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java

@@ -12,14 +12,11 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
 import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.xcontent.ToXContentFragment;
-import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
@@ -121,11 +118,11 @@ public class DriverStatus implements Task.Status {
         this.iterations = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0;
         this.status = Status.read(in);
         if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
-            this.completedOperators = in.readCollectionAsImmutableList(OperatorStatus::new);
+            this.completedOperators = in.readCollectionAsImmutableList(OperatorStatus::readFrom);
         } else {
             this.completedOperators = List.of();
         }
-        this.activeOperators = in.readCollectionAsImmutableList(OperatorStatus::new);
+        this.activeOperators = in.readCollectionAsImmutableList(OperatorStatus::readFrom);
         this.sleeps = DriverSleeps.read(in);
     }
 
@@ -241,6 +238,8 @@ public class DriverStatus implements Task.Status {
         if (builder.humanReadable()) {
             builder.field("cpu_time", TimeValue.timeValueNanos(cpuNanos));
         }
+        builder.field("documents_found", documentsFound());
+        builder.field("values_loaded", valuesLoaded());
         builder.field("iterations", iterations);
         builder.field("status", status, params);
         builder.startArray("completed_operators");
@@ -296,71 +295,31 @@ public class DriverStatus implements Task.Status {
     }
 
     /**
-     * Status of an {@link Operator}.
+         * The number of documents found by this driver.
      */
-    public static class OperatorStatus implements Writeable, ToXContentObject {
-        /**
-         * String representation of the {@link Operator}. Literally just the
-         * {@link Object#toString()} of it.
-         */
-        private final String operator;
-        /**
-         * Status as reported by the {@link Operator}.
-         */
-        @Nullable
-        private final Operator.Status status;
-
-        public OperatorStatus(String operator, Operator.Status status) {
-            this.operator = operator;
-            this.status = status;
+    public long documentsFound() {
+        long documentsFound = 0;
+        for (OperatorStatus s : completedOperators) {
+            documentsFound += s.documentsFound();
         }
-
-        OperatorStatus(StreamInput in) throws IOException {
-            operator = in.readString();
-            status = in.readOptionalNamedWriteable(Operator.Status.class);
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            out.writeString(operator);
-            out.writeOptionalNamedWriteable(status != null && VersionedNamedWriteable.shouldSerialize(out, status) ? status : null);
-        }
-
-        public String operator() {
-            return operator;
-        }
-
-        public Operator.Status status() {
-            return status;
-        }
-
-        @Override
-        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-            builder.startObject();
-            builder.field("operator", operator);
-            if (status != null) {
-                builder.field("status", status);
-            }
-            return builder.endObject();
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            OperatorStatus that = (OperatorStatus) o;
-            return operator.equals(that.operator) && Objects.equals(status, that.status);
+        for (OperatorStatus s : activeOperators) {
+            documentsFound += s.documentsFound();
         }
+        return documentsFound;
+    }
 
-        @Override
-        public int hashCode() {
-            return Objects.hash(operator, status);
+    /**
+     * The number of values loaded by this operator.
+     */
+    public long valuesLoaded() {
+        long valuesLoaded = 0;
+        for (OperatorStatus s : completedOperators) {
+            valuesLoaded += s.valuesLoaded();
         }
-
-        @Override
-        public String toString() {
-            return Strings.toString(this);
+        for (OperatorStatus s : activeOperators) {
+            valuesLoaded += s.valuesLoaded();
         }
+        return valuesLoaded;
     }
 
     public enum Status implements Writeable, ToXContentFragment {

+ 17 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java

@@ -105,5 +105,21 @@ public interface Operator extends Releasable {
     /**
      * Status of an {@link Operator} to be returned by the tasks API.
      */
-    interface Status extends ToXContentObject, VersionedNamedWriteable {}
+    interface Status extends ToXContentObject, VersionedNamedWriteable {
+        /**
+         * The number of documents found by this operator. Most operators
+         * don't find documents and will return {@code 0} here.
+         */
+        default long documentsFound() {
+            return 0;
+        }
+
+        /**
+         * The number of values loaded by this operator. Most operators
+         * don't load values and will return {@code 0} here.
+         */
+        default long valuesLoaded() {
+            return 0;
+        }
+    }
 }

+ 76 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OperatorStatus.java

@@ -0,0 +1,76 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.xcontent.ToXContentObject;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+
+/**
+ * Status of an {@link Operator}.
+ *
+ * @param operator String representation of the {@link Operator}.
+ * @param status Status as reported by the {@link Operator}.
+ */
+public record OperatorStatus(String operator, @Nullable Operator.Status status) implements Writeable, ToXContentObject {
+
+    public static OperatorStatus readFrom(StreamInput in) throws IOException {
+        return new OperatorStatus(in.readString(), in.readOptionalNamedWriteable(Operator.Status.class));
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeString(operator);
+        out.writeOptionalNamedWriteable(
+            status != null && out.getTransportVersion().onOrAfter(status.getMinimalSupportedVersion()) ? status : null
+        );
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field("operator", operator);
+        if (status != null) {
+            builder.field("status", status);
+        }
+        return builder.endObject();
+    }
+
+    @Override
+    public String toString() {
+        return Strings.toString(this);
+    }
+
+    /**
+     * The number of documents found by this operator. Most operators
+     * don't find documents and will return {@code 0} here.
+     */
+    public long documentsFound() {
+        if (status == null) {
+            return 0;
+        }
+        return status.documentsFound();
+    }
+
+    /**
+     * The number of values loaded by this operator. Most operators
+     * don't load values and will return {@code 0} here.
+     */
+    public long valuesLoaded() {
+        if (status == null) {
+            return 0;
+        }
+        return status.valuesLoaded();
+    }
+}

+ 52 - 8
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.compute.test.RandomBlock;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Supplier;
 
 import static org.hamcrest.Matchers.equalTo;
 
@@ -23,19 +24,29 @@ public class CompositeBlockTests extends ComputeTestCase {
         )
         .toList();
 
-    public static CompositeBlock randomCompositeBlock(BlockFactory blockFactory, int numBlocks, int positionCount) {
+    public static CompositeBlock randomCompositeBlock(
+        BlockFactory blockFactory,
+        Supplier<ElementType> randomElementType,
+        boolean nullAllowed,
+        int numBlocks,
+        int positionCount,
+        int minValuesPerPosition,
+        int maxValuesPerPosition,
+        int minDupsPerPosition,
+        int maxDupsPerPosition
+    ) {
         Block[] blocks = new Block[numBlocks];
         for (int b = 0; b < numBlocks; b++) {
-            ElementType elementType = randomFrom(supportedSubElementTypes);
+            ElementType elementType = randomElementType.get();
             blocks[b] = RandomBlock.randomBlock(
                 blockFactory,
                 elementType,
                 positionCount,
-                elementType == ElementType.NULL || randomBoolean(),
-                0,
-                between(1, 2),
-                0,
-                between(1, 2)
+                nullAllowed && (elementType == ElementType.NULL || randomBoolean()),
+                minValuesPerPosition,
+                maxValuesPerPosition,
+                minDupsPerPosition,
+                maxDupsPerPosition
             ).block();
         }
         return new CompositeBlock(blocks);
@@ -45,7 +56,19 @@ public class CompositeBlockTests extends ComputeTestCase {
         final BlockFactory blockFactory = blockFactory();
         int numBlocks = randomIntBetween(1, 1000);
         int positionCount = randomIntBetween(1, 1000);
-        try (CompositeBlock origComposite = randomCompositeBlock(blockFactory, numBlocks, positionCount)) {
+        try (
+            CompositeBlock origComposite = randomCompositeBlock(
+                blockFactory,
+                () -> randomFrom(supportedSubElementTypes),
+                true,
+                numBlocks,
+                positionCount,
+                0,
+                between(1, 2),
+                0,
+                between(1, 2)
+            )
+        ) {
             int[] selected = new int[randomIntBetween(0, positionCount * 3)];
             for (int i = 0; i < selected.length; i++) {
                 selected[i] = randomIntBetween(0, positionCount - 1);
@@ -61,4 +84,25 @@ public class CompositeBlockTests extends ComputeTestCase {
             }
         }
     }
+
+    public void testTotalValueCount() {
+        final BlockFactory blockFactory = blockFactory();
+        int numBlocks = randomIntBetween(1, 1000);
+        int positionCount = randomIntBetween(1, 1000);
+        try (
+            CompositeBlock composite = randomCompositeBlock(
+                blockFactory,
+                () -> randomValueOtherThan(ElementType.NULL, () -> randomFrom(supportedSubElementTypes)),
+                false,
+                numBlocks,
+                positionCount,
+                1,
+                1,
+                0,
+                0
+            )
+        ) {
+            assertThat(composite.getTotalValueCount(), equalTo(numBlocks * positionCount));
+        }
+    }
 }

+ 7 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java

@@ -20,7 +20,7 @@ import static org.hamcrest.Matchers.equalTo;
 
 public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializingTestCase<ValuesSourceReaderOperator.Status> {
     public static ValuesSourceReaderOperator.Status simple() {
-        return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123, 111, 222);
+        return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123, 111, 222, 1000);
     }
 
     public static String simpleToJson() {
@@ -29,6 +29,7 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi
               "readers_built" : {
                 "ReaderType" : 3
               },
+              "values_loaded" : 1000,
               "process_nanos" : 1022323,
               "process_time" : "1ms",
               "pages_processed" : 123,
@@ -53,6 +54,7 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi
             randomNonNegativeLong(),
             randomNonNegativeInt(),
             randomNonNegativeLong(),
+            randomNonNegativeLong(),
             randomNonNegativeLong()
         );
     }
@@ -73,14 +75,16 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi
         int pagesProcessed = instance.pagesProcessed();
         long rowsReceived = instance.rowsReceived();
         long rowsEmitted = instance.rowsEmitted();
-        switch (between(0, 4)) {
+        long valuesLoaded = instance.valuesLoaded();
+        switch (between(0, 5)) {
             case 0 -> readersBuilt = randomValueOtherThan(readersBuilt, this::randomReadersBuilt);
             case 1 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong);
             case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
             case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
             case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
+            case 5 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong);
             default -> throw new UnsupportedOperationException();
         }
-        return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed, rowsReceived, rowsEmitted);
+        return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded);
     }
 }

+ 4 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java

@@ -34,8 +34,8 @@ public class DriverProfileTests extends AbstractWireSerializingTestCase<DriverPr
             10000,
             12,
             List.of(
-                new DriverStatus.OperatorStatus("LuceneSource", LuceneSourceOperatorStatusTests.simple()),
-                new DriverStatus.OperatorStatus("ValuesSourceReader", ValuesSourceReaderOperatorStatusTests.simple())
+                new OperatorStatus("LuceneSource", LuceneSourceOperatorStatusTests.simple()),
+                new OperatorStatus("ValuesSourceReader", ValuesSourceReaderOperatorStatusTests.simple())
             ),
             new DriverSleeps(
                 Map.of("driver time", 1L),
@@ -54,6 +54,8 @@ public class DriverProfileTests extends AbstractWireSerializingTestCase<DriverPr
               "took_time" : "10micros",
               "cpu_nanos" : 10000,
               "cpu_time" : "10micros",
+              "documents_found" : 222,
+              "values_loaded" : 1000,
               "iterations" : 12,
               "operators" : [
                 {

+ 8 - 6
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java

@@ -39,10 +39,10 @@ public class DriverStatusTests extends AbstractWireSerializingTestCase<DriverSta
             55L,
             DriverStatus.Status.RUNNING,
             List.of(
-                new DriverStatus.OperatorStatus("LuceneSource", LuceneSourceOperatorStatusTests.simple()),
-                new DriverStatus.OperatorStatus("ValuesSourceReader", ValuesSourceReaderOperatorStatusTests.simple())
+                new OperatorStatus("LuceneSource", LuceneSourceOperatorStatusTests.simple()),
+                new OperatorStatus("ValuesSourceReader", ValuesSourceReaderOperatorStatusTests.simple())
             ),
-            List.of(new DriverStatus.OperatorStatus("ExchangeSink", ExchangeSinkOperatorStatusTests.simple())),
+            List.of(new OperatorStatus("ExchangeSink", ExchangeSinkOperatorStatusTests.simple())),
             new DriverSleeps(
                 Map.of("driver time", 1L),
                 List.of(new DriverSleeps.Sleep("driver time", 1, 1)),
@@ -57,6 +57,8 @@ public class DriverStatusTests extends AbstractWireSerializingTestCase<DriverSta
               "last_updated" : "1973-11-29T09:27:23.214Z",
               "cpu_nanos" : 123213,
               "cpu_time" : "123.2micros",
+              "documents_found" : 222,
+              "values_loaded" : 1000,
               "iterations" : 55,
               "status" : "running",
               "completed_operators" : [
@@ -140,18 +142,18 @@ public class DriverStatusTests extends AbstractWireSerializingTestCase<DriverSta
         return randomFrom(DriverStatus.Status.values());
     }
 
-    static List<DriverStatus.OperatorStatus> randomOperatorStatuses() {
+    static List<OperatorStatus> randomOperatorStatuses() {
         return randomList(0, 5, DriverStatusTests::randomOperatorStatus);
     }
 
-    private static DriverStatus.OperatorStatus randomOperatorStatus() {
+    private static OperatorStatus randomOperatorStatus() {
         Supplier<Operator.Status> status = randomFrom(
             new LuceneSourceOperatorStatusTests()::createTestInstance,
             new ValuesSourceReaderOperatorStatusTests()::createTestInstance,
             new ExchangeSinkOperatorStatusTests()::createTestInstance,
             () -> null
         );
-        return new DriverStatus.OperatorStatus(randomAlphaOfLength(3), status.get());
+        return new OperatorStatus(randomAlphaOfLength(3), status.get());
     }
 
     @Override

+ 5 - 1
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java

@@ -159,7 +159,11 @@ public class MultiClustersIT extends ESRestTestCase {
     }
 
     private <C, V> void assertResultMap(boolean includeCCSMetadata, Map<String, Object> result, C columns, V values, boolean remoteOnly) {
-        MapMatcher mapMatcher = getResultMatcher(ccsMetadataAvailable(), result.containsKey("is_partial"));
+        MapMatcher mapMatcher = getResultMatcher(
+            ccsMetadataAvailable(),
+            result.containsKey("is_partial"),
+            result.containsKey("documents_found")
+        );
         if (includeCCSMetadata) {
             mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
         }

+ 5 - 2
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

@@ -668,7 +668,9 @@ public class RestEsqlIT extends RestEsqlTestCase {
             .entry("cpu_nanos", greaterThan(0L))
             .entry("took_nanos", greaterThan(0L))
             .entry("operators", instanceOf(List.class))
-            .entry("sleeps", matchesMap().extraOk());
+            .entry("sleeps", matchesMap().extraOk())
+            .entry("documents_found", greaterThanOrEqualTo(0))
+            .entry("values_loaded", greaterThanOrEqualTo(0));
     }
 
     /**
@@ -698,7 +700,8 @@ public class RestEsqlIT extends RestEsqlTestCase {
                 .entry("processing_nanos", greaterThan(0))
                 .entry("processed_queries", List.of("*:*"))
                 .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD"));
-            case "ValuesSourceReaderOperator" -> basicProfile().entry("readers_built", matchesMap().extraOk());
+            case "ValuesSourceReaderOperator" -> basicProfile().entry("values_loaded", greaterThanOrEqualTo(0))
+                .entry("readers_built", matchesMap().extraOk());
             case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0))
                 .entry("rows_received", greaterThan(0))
                 .entry("rows_emitted", greaterThan(0))

+ 1 - 2
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java

@@ -108,8 +108,7 @@ public class StoredFieldsSequentialIT extends ESRestTestCase {
         Map<String, Object> result = runEsql(builder, new AssertWarnings.NoWarnings(), RestEsqlTestCase.Mode.SYNC);
         assertMap(
             result,
-            matchesMap()
-                // .entry("documents_found", documentsFound) Backport incoming maybe
+            matchesMap().entry("documents_found", documentsFound)
                 .entry(
                     "profile",
                     matchesMap().entry("drivers", instanceOf(List.class))

+ 11 - 3
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

@@ -59,6 +59,7 @@ import static java.util.Collections.emptySet;
 import static java.util.Map.entry;
 import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
 import static org.elasticsearch.test.ListMatcher.matchesList;
+import static org.elasticsearch.test.MapMatcher.assertMap;
 import static org.elasticsearch.test.MapMatcher.matchesMap;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
 import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC;
@@ -270,12 +271,19 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
 
     public void testGetAnswer() throws IOException {
         Map<String, Object> answer = runEsql(requestObjectBuilder().query("row a = 1, b = 2"));
-        assertEquals(4, answer.size());
+        assertEquals(6, answer.size());
         assertThat(((Integer) answer.get("took")).intValue(), greaterThanOrEqualTo(0));
         Map<String, String> colA = Map.of("name", "a", "type", "integer");
         Map<String, String> colB = Map.of("name", "b", "type", "integer");
-        assertEquals(List.of(colA, colB), answer.get("columns"));
-        assertEquals(List.of(List.of(1, 2)), answer.get("values"));
+        assertMap(
+            answer,
+            matchesMap().entry("took", greaterThanOrEqualTo(0))
+                .entry("is_partial", any(Boolean.class))
+                .entry("documents_found", 0)
+                .entry("values_loaded", 0)
+                .entry("columns", List.of(colA, colB))
+                .entry("values", List.of(List.of(1, 2)))
+        );
     }
 
     public void testUseUnknownIndex() throws IOException {

+ 2 - 2
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java

@@ -20,7 +20,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.operator.DriverProfile;
-import org.elasticsearch.compute.operator.DriverStatus;
+import org.elasticsearch.compute.operator.OperatorStatus;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
@@ -350,7 +350,7 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {
             assertNotNull(profile);
             List<DriverProfile> drivers = profile.drivers();
             assertThat(drivers.size(), greaterThanOrEqualTo(2));
-            List<DriverStatus.OperatorStatus> enrichOperators = drivers.stream()
+            List<OperatorStatus> enrichOperators = drivers.stream()
                 .flatMap(d -> d.operators().stream())
                 .filter(status -> status.operator().startsWith("EnrichOperator"))
                 .toList();

+ 16 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

@@ -22,6 +22,7 @@ import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.DriverStatus;
 import org.elasticsearch.compute.operator.DriverTaskRunner;
+import org.elasticsearch.compute.operator.OperatorStatus;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
 import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
@@ -104,7 +105,7 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
                 DriverStatus status = (DriverStatus) task.status();
                 assertThat(status.sessionId(), not(emptyOrNullString()));
                 String taskDescription = status.taskDescription();
-                for (DriverStatus.OperatorStatus o : status.activeOperators()) {
+                for (OperatorStatus o : status.activeOperators()) {
                     logger.info("status {}", o);
                     if (o.operator().startsWith("LuceneSourceOperator[maxPageSize = " + pageSize())) {
                         assertThat(taskDescription, equalTo("data"));
@@ -134,6 +135,7 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
                             matchesMap().entry("pause_me:column_at_a_time:ScriptLongs", greaterThanOrEqualTo(1))
                         );
                         assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1));
+                        assertThat(oStatus.valuesLoaded(), greaterThanOrEqualTo(1L));
                         valuesSourceReaders++;
                         continue;
                     }
@@ -180,6 +182,19 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
                 \\_ProjectOperator[projection = [0]]
                 \\_LimitOperator[limit = 1000]
                 \\_OutputOperator[columns = [sum(pause_me)]]"""));
+
+            for (TaskInfo task : dataTasks(foundTasks)) {
+                assertThat(((DriverStatus) task.status()).documentsFound(), greaterThan(0L));
+                assertThat(((DriverStatus) task.status()).valuesLoaded(), greaterThan(0L));
+            }
+            for (TaskInfo task : nodeReduceTasks(foundTasks)) {
+                assertThat(((DriverStatus) task.status()).documentsFound(), equalTo(0L));
+                assertThat(((DriverStatus) task.status()).valuesLoaded(), equalTo(0L));
+            }
+            for (TaskInfo task : coordinatorTasks(foundTasks)) {
+                assertThat(((DriverStatus) task.status()).documentsFound(), equalTo(0L));
+                assertThat(((DriverStatus) task.status()).valuesLoaded(), equalTo(0L));
+            }
         } finally {
             scriptPermits.release(numberOfDocs());
             try (EsqlQueryResponse esqlResponse = response.get()) {

+ 6 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -864,6 +864,12 @@ public class EsqlCapabilities {
          */
         REPORT_ORIGINAL_TYPES,
 
+        /**
+         * Are the {@code documents_found} and {@code values_loaded} fields available
+         * in the response and profile?
+         */
+        DOCUMENTS_FOUND_AND_VALUES_LOADED,
+
         /**
          * When creating constant null blocks in {@link org.elasticsearch.compute.lucene.ValuesSourceReaderOperator}, we also handed off
          * the ownership of that block - but didn't account for the fact that the caller might close it, leading to double releases

+ 45 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

@@ -34,6 +34,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 
+import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
+
 public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse
     implements
         ChunkedToXContentObject,
@@ -46,6 +48,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
 
     private final List<ColumnInfoImpl> columns;
     private final List<Page> pages;
+    private final long documentsFound;
+    private final long valuesLoaded;
     private final Profile profile;
     private final boolean columnar;
     private final String asyncExecutionId;
@@ -57,6 +61,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
     public EsqlQueryResponse(
         List<ColumnInfoImpl> columns,
         List<Page> pages,
+        long documentsFound,
+        long valuesLoaded,
         @Nullable Profile profile,
         boolean columnar,
         @Nullable String asyncExecutionId,
@@ -66,6 +72,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
     ) {
         this.columns = columns;
         this.pages = pages;
+        this.valuesLoaded = valuesLoaded;
+        this.documentsFound = documentsFound;
         this.profile = profile;
         this.columnar = columnar;
         this.asyncExecutionId = asyncExecutionId;
@@ -77,12 +85,14 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
     public EsqlQueryResponse(
         List<ColumnInfoImpl> columns,
         List<Page> pages,
+        long documentsFound,
+        long valuesLoaded,
         @Nullable Profile profile,
         boolean columnar,
         boolean isAsync,
         EsqlExecutionInfo executionInfo
     ) {
-        this(columns, pages, profile, columnar, null, false, isAsync, executionInfo);
+        this(columns, pages, documentsFound, valuesLoaded, profile, columnar, null, false, isAsync, executionInfo);
     }
 
     /**
@@ -108,6 +118,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
         }
         List<ColumnInfoImpl> columns = in.readCollectionAsList(ColumnInfoImpl::new);
         List<Page> pages = in.readCollectionAsList(Page::new);
+        long documentsFound = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19) ? in.readVLong() : 0;
+        long valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19) ? in.readVLong() : 0;
         if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
             profile = in.readOptionalWriteable(Profile::new);
         }
@@ -116,7 +128,18 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
         if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
             executionInfo = in.readOptionalWriteable(EsqlExecutionInfo::new);
         }
-        return new EsqlQueryResponse(columns, pages, profile, columnar, asyncExecutionId, isRunning, isAsync, executionInfo);
+        return new EsqlQueryResponse(
+            columns,
+            pages,
+            documentsFound,
+            valuesLoaded,
+            profile,
+            columnar,
+            asyncExecutionId,
+            isRunning,
+            isAsync,
+            executionInfo
+        );
     }
 
     @Override
@@ -128,6 +151,10 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
         }
         out.writeCollection(columns);
         out.writeCollection(pages);
+        if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) {
+            out.writeVLong(documentsFound);
+            out.writeVLong(valuesLoaded);
+        }
         if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
             out.writeOptionalWriteable(profile);
         }
@@ -160,6 +187,14 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
         return ResponseValueUtils.valuesForColumn(columnIndex, columns.get(columnIndex).type(), pages);
     }
 
+    public long documentsFound() {
+        return documentsFound;
+    }
+
+    public long valuesLoaded() {
+        return valuesLoaded;
+    }
+
     public Profile profile() {
         return profile;
     }
@@ -200,6 +235,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
                 }
                 b.field("is_running", isRunning);
             }
+            b.field("documents_found", documentsFound);
+            b.field("values_loaded", valuesLoaded);
             if (executionInfo != null) {
                 long tookInMillis = executionInfo.overallTook() == null
                     ? executionInfo.tookSoFar().millis()
@@ -261,6 +298,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
             && Objects.equals(isRunning, that.isRunning)
             && columnar == that.columnar
             && Iterators.equals(values(), that.values(), (row1, row2) -> Iterators.equals(row1, row2, Objects::equals))
+            && documentsFound == that.documentsFound
+            && valuesLoaded == that.valuesLoaded
             && Objects.equals(profile, that.profile)
             && Objects.equals(executionInfo, that.executionInfo);
     }
@@ -271,8 +310,11 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
             asyncExecutionId,
             isRunning,
             columns,
-            Iterators.hashCode(values(), row -> Iterators.hashCode(row, Objects::hashCode)),
             columnar,
+            Iterators.hashCode(values(), row -> Iterators.hashCode(row, Objects::hashCode)),
+            documentsFound,
+            valuesLoaded,
+            profile,
             executionInfo
         );
     }

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java

@@ -44,6 +44,7 @@ public class EsqlQueryTask extends StoredAsyncTask<EsqlQueryResponse> {
 
     @Override
     public EsqlQueryResponse getCurrentResult() {
-        return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, executionInfo);
+        // TODO it'd be nice to have the number of documents we've read from completed drivers here
+        return new EsqlQueryResponse(List.of(), List.of(), 0, 0, null, false, getExecutionId().getEncoded(), true, true, executionInfo);
     }
 }

+ 7 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

@@ -11,7 +11,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.support.ChannelActionListener;
-import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
 import org.elasticsearch.core.Releasable;
@@ -75,7 +75,7 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
         RemoteCluster cluster,
         Runnable cancelQueryOnFailure,
         EsqlExecutionInfo executionInfo,
-        ActionListener<List<DriverProfile>> listener
+        ActionListener<DriverCompletionInfo> listener
     ) {
         var queryPragmas = configuration.pragmas();
         listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
@@ -93,7 +93,7 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
                     receivedResults ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SKIPPED,
                     e
                 );
-                l.onResponse(List.of());
+                l.onResponse(DriverCompletionInfo.EMPTY);
             } else {
                 l.onFailure(e);
             }
@@ -121,15 +121,15 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
                     onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
                     l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
                 }
-                try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(profiles -> {
+                try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(completionInfo -> {
                     updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get());
-                    return profiles;
+                    return completionInfo;
                 }))) {
                     var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
                     var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan);
                     final ActionListener<ComputeResponse> clusterListener = computeListener.acquireCompute().map(r -> {
                         finalResponse.set(r);
-                        return r.getProfiles();
+                        return r.getCompletionInfo();
                     });
                     transportService.sendChildRequest(
                         cluster.connection,
@@ -290,7 +290,7 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
                     cancelQueryOnFailure,
                     computeListener.acquireCompute().map(r -> {
                         finalResponse.set(r);
-                        return r.getProfiles();
+                        return r.getCompletionInfo();
                     })
                 );
             }

+ 7 - 14
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

@@ -10,15 +10,11 @@ package org.elasticsearch.xpack.esql.plugin;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.RefCountingListener;
 import org.elasticsearch.compute.EsqlRefCountingListener;
-import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.compute.operator.ResponseHeadersCollector;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.threadpool.ThreadPool;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 /**
  * A variant of {@link RefCountingListener} with the following differences:
  * 1. Automatically cancels sub tasks on failure (via runOnTaskFailure)
@@ -27,19 +23,18 @@ import java.util.List;
  * 4. Collects failures and returns the most appropriate exception to the caller.
  */
 final class ComputeListener implements Releasable {
+    private final DriverCompletionInfo.AtomicAccumulator completionInfoAccumulator = new DriverCompletionInfo.AtomicAccumulator();
     private final EsqlRefCountingListener refs;
-    private final List<DriverProfile> collectedProfiles;
     private final ResponseHeadersCollector responseHeaders;
     private final Runnable runOnFailure;
 
-    ComputeListener(ThreadPool threadPool, Runnable runOnFailure, ActionListener<List<DriverProfile>> delegate) {
+    ComputeListener(ThreadPool threadPool, Runnable runOnFailure, ActionListener<DriverCompletionInfo> delegate) {
         this.runOnFailure = runOnFailure;
         this.responseHeaders = new ResponseHeadersCollector(threadPool.getThreadContext());
-        this.collectedProfiles = Collections.synchronizedList(new ArrayList<>());
         // listener that executes after all the sub-listeners refs (created via acquireCompute) have completed
         this.refs = new EsqlRefCountingListener(delegate.delegateFailure((l, ignored) -> {
             responseHeaders.finish();
-            delegate.onResponse(collectedProfiles.stream().toList());
+            delegate.onResponse(completionInfoAccumulator.finish());
         }));
     }
 
@@ -60,13 +55,11 @@ final class ComputeListener implements Releasable {
     /**
      * Acquires a new listener that collects compute result. This listener will also collect warnings emitted during compute
      */
-    ActionListener<List<DriverProfile>> acquireCompute() {
+    ActionListener<DriverCompletionInfo> acquireCompute() {
         final ActionListener<Void> delegate = acquireAvoid();
-        return ActionListener.wrap(profiles -> {
+        return ActionListener.wrap(info -> {
             responseHeaders.collect();
-            if (profiles != null && profiles.isEmpty() == false) {
-                collectedProfiles.addAll(profiles);
-            }
+            completionInfoAccumulator.accumulate(info);
             delegate.onResponse(null);
         }, e -> {
             responseHeaders.collect();

+ 21 - 18
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java

@@ -11,6 +11,7 @@ import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.compute.operator.DriverProfile;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.transport.TransportResponse;
@@ -18,11 +19,13 @@ import org.elasticsearch.transport.TransportResponse;
 import java.io.IOException;
 import java.util.List;
 
+import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
+
 /**
  * The compute result of {@link DataNodeRequest} or {@link ClusterComputeRequest}
  */
 final class ComputeResponse extends TransportResponse {
-    private final List<DriverProfile> profiles;
+    private final DriverCompletionInfo completionInfo;
 
     // for use with ClusterComputeRequests (cross-cluster searches)
     private final TimeValue took;  // overall took time for a specific cluster in a cross-cluster search
@@ -32,12 +35,12 @@ final class ComputeResponse extends TransportResponse {
     public final int failedShards;
     public final List<ShardSearchFailure> failures;
 
-    ComputeResponse(List<DriverProfile> profiles) {
-        this(profiles, null, null, null, null, null, List.of());
+    ComputeResponse(DriverCompletionInfo completionInfo) {
+        this(completionInfo, null, null, null, null, null, List.of());
     }
 
     ComputeResponse(
-        List<DriverProfile> profiles,
+        DriverCompletionInfo completionInfo,
         TimeValue took,
         Integer totalShards,
         Integer successfulShards,
@@ -45,7 +48,7 @@ final class ComputeResponse extends TransportResponse {
         Integer failedShards,
         List<ShardSearchFailure> failures
     ) {
-        this.profiles = profiles;
+        this.completionInfo = completionInfo;
         this.took = took;
         this.totalShards = totalShards == null ? 0 : totalShards.intValue();
         this.successfulShards = successfulShards == null ? 0 : successfulShards.intValue();
@@ -56,14 +59,16 @@ final class ComputeResponse extends TransportResponse {
 
     ComputeResponse(StreamInput in) throws IOException {
         super(in);
-        if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
+        if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) {
+            completionInfo = new DriverCompletionInfo(in);
+        } else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
             if (in.readBoolean()) {
-                profiles = in.readCollectionAsImmutableList(DriverProfile::new);
+                completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::new));
             } else {
-                profiles = null;
+                completionInfo = DriverCompletionInfo.EMPTY;
             }
         } else {
-            profiles = null;
+            completionInfo = DriverCompletionInfo.EMPTY;
         }
         if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
             this.took = in.readOptionalTimeValue();
@@ -87,13 +92,11 @@ final class ComputeResponse extends TransportResponse {
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
-            if (profiles == null) {
-                out.writeBoolean(false);
-            } else {
-                out.writeBoolean(true);
-                out.writeCollection(profiles);
-            }
+        if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) {
+            completionInfo.writeTo(out);
+        } else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
+            out.writeBoolean(true);
+            out.writeCollection(completionInfo.collectedProfiles());
         }
         if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
             out.writeOptionalTimeValue(took);
@@ -107,8 +110,8 @@ final class ComputeResponse extends TransportResponse {
         }
     }
 
-    public List<DriverProfile> getProfiles() {
-        return profiles;
+    public DriverCompletionInfo getCompletionInfo() {
+        return completionInfo;
     }
 
     public TimeValue getTook() {

+ 26 - 16
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -19,7 +19,7 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.operator.Driver;
-import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.compute.operator.DriverTaskRunner;
 import org.elasticsearch.compute.operator.FailureCollector;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
@@ -184,10 +184,14 @@ public class ComputeService {
             );
             updateShardCountForCoordinatorOnlyQuery(execInfo);
             try (
-                var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
-                    updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo);
-                    return new Result(physicalPlan.output(), collectedPages, profiles, execInfo);
-                }))
+                var computeListener = new ComputeListener(
+                    transportService.getThreadPool(),
+                    cancelQueryOnFailure,
+                    listener.map(completionInfo -> {
+                        updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo);
+                        return new Result(physicalPlan.output(), collectedPages, completionInfo, execInfo);
+                    })
+                )
             ) {
                 runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute());
                 return;
@@ -216,10 +220,16 @@ public class ComputeService {
         );
         listener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId));
         exchangeService.addExchangeSourceHandler(sessionId, exchangeSource);
-        try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
-            execInfo.markEndQuery();  // TODO: revisit this time recording model as part of INLINESTATS improvements
-            return new Result(outputAttributes, collectedPages, profiles, execInfo);
-        }))) {
+        try (
+            var computeListener = new ComputeListener(
+                transportService.getThreadPool(),
+                cancelQueryOnFailure,
+                listener.map(completionInfo -> {
+                    execInfo.markEndQuery();  // TODO: revisit this time recording model as part of INLINESTATS improvements
+                    return new Result(outputAttributes, collectedPages, completionInfo, execInfo);
+                })
+            )
+        ) {
             try (Releasable ignored = exchangeSource.addEmptySink()) {
                 // run compute on the coordinator
                 final AtomicBoolean localClusterWasInterrupted = new AtomicBoolean();
@@ -227,7 +237,7 @@ public class ComputeService {
                     var localListener = new ComputeListener(
                         transportService.getThreadPool(),
                         cancelQueryOnFailure,
-                        computeListener.acquireCompute().delegateFailure((l, profiles) -> {
+                        computeListener.acquireCompute().delegateFailure((l, completionInfo) -> {
                             if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) {
                                 execInfo.swapCluster(LOCAL_CLUSTER, (k, v) -> {
                                     var tookTime = execInfo.tookSoFar();
@@ -242,7 +252,7 @@ public class ComputeService {
                                     return builder.build();
                                 });
                             }
-                            l.onResponse(profiles);
+                            l.onResponse(completionInfo);
                         })
                     )
                 ) {
@@ -285,7 +295,7 @@ public class ComputeService {
                                         .setFailures(r.failures)
                                         .build()
                                 );
-                                dataNodesListener.onResponse(r.getProfiles());
+                                dataNodesListener.onResponse(r.getCompletionInfo());
                             }, e -> {
                                 if (configuration.allowPartialResults() && EsqlCCSUtils.canAllowPartial(e)) {
                                     execInfo.swapCluster(
@@ -294,7 +304,7 @@ public class ComputeService {
                                             EsqlExecutionInfo.Cluster.Status.PARTIAL
                                         ).setFailures(List.of(new ShardSearchFailure(e))).build()
                                     );
-                                    dataNodesListener.onResponse(List.of());
+                                    dataNodesListener.onResponse(DriverCompletionInfo.EMPTY);
                                 } else {
                                     dataNodesListener.onFailure(e);
                                 }
@@ -373,7 +383,7 @@ public class ComputeService {
         }
     }
 
-    void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<List<DriverProfile>> listener) {
+    void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<DriverCompletionInfo> listener) {
         listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts()));
         List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts().size());
         for (int i = 0; i < context.searchContexts().size(); i++) {
@@ -437,9 +447,9 @@ public class ComputeService {
         }
         ActionListener<Void> listenerCollectingStatus = listener.map(ignored -> {
             if (context.configuration().profile()) {
-                return drivers.stream().map(Driver::profile).toList();
+                return DriverCompletionInfo.includingProfiles(drivers);
             } else {
-                return List.of();
+                return DriverCompletionInfo.excludingProfiles(drivers);
             }
         });
         listenerCollectingStatus = ActionListener.releaseAfter(listenerCollectingStatus, () -> Releasables.close(drivers));

+ 8 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

@@ -17,7 +17,7 @@ import org.elasticsearch.action.support.ChannelActionListener;
 import org.elasticsearch.action.support.RefCountingRunnable;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.compute.operator.exchange.ExchangeSink;
 import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
@@ -191,7 +191,7 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
                                 TransportRequestOptions.EMPTY,
                                 new ActionListenerResponseHandler<>(computeListener.acquireCompute().map(r -> {
                                     nodeResponseRef.set(r);
-                                    return r.profiles();
+                                    return r.completionInfo();
                                 }), DataNodeComputeResponse::new, esqlExecutor)
                             );
                             final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
@@ -256,15 +256,15 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
             final int endBatchIndex = Math.min(startBatchIndex + maxConcurrentShards, request.shardIds().size());
             final AtomicInteger pagesProduced = new AtomicInteger();
             List<ShardId> shardIds = request.shardIds().subList(startBatchIndex, endBatchIndex);
-            ActionListener<List<DriverProfile>> batchListener = new ActionListener<>() {
-                final ActionListener<List<DriverProfile>> ref = computeListener.acquireCompute();
+            ActionListener<DriverCompletionInfo> batchListener = new ActionListener<>() {
+                final ActionListener<DriverCompletionInfo> ref = computeListener.acquireCompute();
 
                 @Override
-                public void onResponse(List<DriverProfile> result) {
+                public void onResponse(DriverCompletionInfo info) {
                     try {
                         onBatchCompleted(endBatchIndex);
                     } finally {
-                        ref.onResponse(result);
+                        ref.onResponse(info);
                     }
                 }
 
@@ -274,7 +274,7 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
                         for (ShardId shardId : shardIds) {
                             addShardLevelFailure(shardId, e);
                         }
-                        onResponse(List.of());
+                        onResponse(DriverCompletionInfo.EMPTY);
                     } else {
                         // TODO: add these to fatal failures so we can continue processing other shards.
                         try {
@@ -288,7 +288,7 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
             acquireSearchContexts(clusterAlias, shardIds, configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
                 assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ESQL_WORKER_THREAD_POOL_NAME);
                 if (searchContexts.isEmpty()) {
-                    batchListener.onResponse(List.of());
+                    batchListener.onResponse(DriverCompletionInfo.EMPTY);
                     return;
                 }
                 var computeContext = new ComputeContext(

+ 28 - 17
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java

@@ -9,52 +9,63 @@ package org.elasticsearch.xpack.esql.plugin;
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.compute.operator.DriverProfile;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.transport.TransportResponse;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+
+import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
 
 /**
  * The compute result of {@link DataNodeRequest}
  */
 final class DataNodeComputeResponse extends TransportResponse {
-    private final List<DriverProfile> profiles;
+    private final DriverCompletionInfo completionInfo;
     private final Map<ShardId, Exception> shardLevelFailures;
 
-    DataNodeComputeResponse(List<DriverProfile> profiles, Map<ShardId, Exception> shardLevelFailures) {
-        this.profiles = profiles;
+    DataNodeComputeResponse(DriverCompletionInfo completionInfo, Map<ShardId, Exception> shardLevelFailures) {
+        this.completionInfo = completionInfo;
         this.shardLevelFailures = shardLevelFailures;
     }
 
     DataNodeComputeResponse(StreamInput in) throws IOException {
+        if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) {
+            this.completionInfo = new DriverCompletionInfo(in);
+            this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException);
+            return;
+        }
         if (DataNodeComputeHandler.supportShardLevelRetryFailure(in.getTransportVersion())) {
-            this.profiles = in.readCollectionAsImmutableList(DriverProfile::new);
+            this.completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::new));
             this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException);
-        } else {
-            this.profiles = Objects.requireNonNullElse(new ComputeResponse(in).getProfiles(), List.of());
-            this.shardLevelFailures = Map.of();
+            return;
         }
+        this.completionInfo = new ComputeResponse(in).getCompletionInfo();
+        this.shardLevelFailures = Map.of();
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
+        if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) {
+            completionInfo.writeTo(out);
+            out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
+            return;
+        }
         if (DataNodeComputeHandler.supportShardLevelRetryFailure(out.getTransportVersion())) {
-            out.writeCollection(profiles, (o, v) -> v.writeTo(o));
+            out.writeCollection(completionInfo.collectedProfiles(), (o, v) -> v.writeTo(o));
             out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
-        } else {
-            if (shardLevelFailures.isEmpty() == false) {
-                throw new IllegalStateException("shard level failures are not supported in old versions");
-            }
-            new ComputeResponse(profiles).writeTo(out);
+            return;
+        }
+        if (shardLevelFailures.isEmpty() == false) {
+            throw new IllegalStateException("shard level failures are not supported in old versions");
         }
+        new ComputeResponse(completionInfo).writeTo(out);
     }
 
-    List<DriverProfile> profiles() {
-        return profiles;
+    public DriverCompletionInfo completionInfo() {
+        return completionInfo;
     }
 
     Map<ShardId, Exception> shardLevelFailures() {

+ 9 - 9
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

@@ -23,7 +23,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.compute.operator.FailureCollector;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.query.QueryBuilder;
@@ -130,8 +130,8 @@ abstract class DataNodeRequestSender {
                     transportService.getThreadPool(),
                     runOnTaskFailure,
                     listener.map(
-                        profiles -> new ComputeResponse(
-                            profiles,
+                        completionInfo -> new ComputeResponse(
+                            completionInfo,
                             timeValueNanos(System.nanoTime() - startTimeInNanos),
                             targetShards.totalShards(),
                             targetShards.totalShards() - shardFailures.size() - skippedShards.get(),
@@ -263,15 +263,15 @@ abstract class DataNodeRequestSender {
     }
 
     private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) {
-        final ActionListener<List<DriverProfile>> listener = computeListener.acquireCompute();
+        final ActionListener<DriverCompletionInfo> listener = computeListener.acquireCompute();
         sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
-            void onAfter(List<DriverProfile> profiles) {
+            void onAfter(DriverCompletionInfo info) {
                 nodePermits.get(request.node).release();
                 if (concurrentRequests != null) {
                     concurrentRequests.release();
                 }
                 trySendingRequestsForPendingShards(targetShards, computeListener);
-                listener.onResponse(profiles);
+                listener.onResponse(info);
             }
 
             @Override
@@ -287,7 +287,7 @@ abstract class DataNodeRequestSender {
                     trackShardLevelFailure(shardId, false, e.getValue());
                     pendingShardIds.add(shardId);
                 }
-                onAfter(response.profiles());
+                onAfter(response.completionInfo());
             }
 
             @Override
@@ -296,7 +296,7 @@ abstract class DataNodeRequestSender {
                     trackShardLevelFailure(shardId, receivedData, e);
                     pendingShardIds.add(shardId);
                 }
-                onAfter(List.of());
+                onAfter(DriverCompletionInfo.EMPTY);
             }
 
             @Override
@@ -305,7 +305,7 @@ abstract class DataNodeRequestSender {
                 if (rootTask.isCancelled()) {
                     onFailure(new TaskCancelledException("null"), true);
                 } else {
-                    onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
+                    onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
                 }
             }
         });

+ 17 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -333,7 +333,9 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             }
             return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes);
         }).toList();
-        EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
+        EsqlQueryResponse.Profile profile = configuration.profile()
+            ? new EsqlQueryResponse.Profile(result.completionInfo().collectedProfiles())
+            : null;
         threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0");
         if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
             String asyncExecutionId = asyncTask.getExecutionId().getEncoded();
@@ -341,6 +343,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             return new EsqlQueryResponse(
                 columns,
                 result.pages(),
+                result.completionInfo().documentsFound(),
+                result.completionInfo().valuesLoaded(),
                 profile,
                 request.columnar(),
                 asyncExecutionId,
@@ -349,7 +353,16 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
                 result.executionInfo()
             );
         }
-        return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async(), result.executionInfo());
+        return new EsqlQueryResponse(
+            columns,
+            result.pages(),
+            result.completionInfo().documentsFound(),
+            result.completionInfo().valuesLoaded(),
+            profile,
+            request.columnar(),
+            request.async(),
+            result.executionInfo()
+        );
     }
 
     /**
@@ -401,6 +414,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
         return new EsqlQueryResponse(
             List.of(),
             List.of(),
+            0,
+            0,
             null,
             false,
             asyncExecutionId,

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

@@ -16,6 +16,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.QueryBuilder;
@@ -78,7 +79,7 @@ public class EsqlCCSUtils {
         public void onFailure(Exception e) {
             if (returnSuccessWithEmptyResult(executionInfo, e)) {
                 updateExecutionInfoToReturnEmptyResult(executionInfo, e);
-                listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo));
+                listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), DriverCompletionInfo.EMPTY, executionInfo));
             } else {
                 listener.onFailure(e);
             }

+ 9 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -17,7 +17,7 @@ import org.elasticsearch.common.lucene.BytesRefs;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.query.QueryBuilder;
@@ -237,7 +237,7 @@ public class EsqlSession {
         // TODO: merge into one method
         if (subplans.size() > 0) {
             // code-path to execute subplans
-            executeSubPlan(new ArrayList<>(), physicalPlan, iterator, executionInfo, runner, listener);
+            executeSubPlan(new DriverCompletionInfo.Accumulator(), physicalPlan, iterator, executionInfo, runner, listener);
         } else {
             // execute main plan
             runner.run(physicalPlan, listener);
@@ -245,7 +245,7 @@ public class EsqlSession {
     }
 
     private void executeSubPlan(
-        List<DriverProfile> profileAccumulator,
+        DriverCompletionInfo.Accumulator completionInfoAccumulator,
         PhysicalPlan plan,
         Iterator<PlanTuple> subPlanIterator,
         EsqlExecutionInfo executionInfo,
@@ -256,7 +256,7 @@ public class EsqlSession {
 
         runner.run(tuple.physical, listener.delegateFailureAndWrap((next, result) -> {
             try {
-                profileAccumulator.addAll(result.profiles());
+                completionInfoAccumulator.accumulate(result.completionInfo());
                 LocalRelation resultWrapper = resultToPlan(tuple.logical, result);
 
                 // replace the original logical plan with the backing result
@@ -271,12 +271,14 @@ public class EsqlSession {
                 });
                 if (subPlanIterator.hasNext() == false) {
                     runner.run(newPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
-                        profileAccumulator.addAll(finalResult.profiles());
-                        finalListener.onResponse(new Result(finalResult.schema(), finalResult.pages(), profileAccumulator, executionInfo));
+                        completionInfoAccumulator.accumulate(finalResult.completionInfo());
+                        finalListener.onResponse(
+                            new Result(finalResult.schema(), finalResult.pages(), completionInfoAccumulator.finish(), executionInfo)
+                        );
                     }));
                 } else {
                     // continue executing the subplans
-                    executeSubPlan(profileAccumulator, newPlan, subPlanIterator, executionInfo, runner, next);
+                    executeSubPlan(completionInfoAccumulator, newPlan, subPlanIterator, executionInfo, runner, next);
                 }
             } finally {
                 Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks)));

+ 8 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java

@@ -9,7 +9,7 @@ package org.elasticsearch.xpack.esql.session;
 
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
@@ -23,10 +23,12 @@ import java.util.List;
  *               that was run. Each {@link Page} contains a {@link Block} of values for each
  *               attribute in this list.
  * @param pages Actual values produced by running the ESQL.
- * @param profiles {@link DriverProfile}s from all drivers that ran to produce the output. These
- *                 are quite cheap to build, so we build them for all ESQL runs, regardless of if
- *                 users have asked for them. But we only include them in the results if users ask
- *                 for them.
+ * @param completionInfo Information collected from drivers after they've been completed.
  * @param executionInfo Metadata about the execution of this query. Used for cross cluster queries.
  */
-public record Result(List<Attribute> schema, List<Page> pages, List<DriverProfile> profiles, @Nullable EsqlExecutionInfo executionInfo) {}
+public record Result(
+    List<Attribute> schema,
+    List<Page> pages,
+    DriverCompletionInfo completionInfo,
+    @Nullable EsqlExecutionInfo executionInfo
+) {}

+ 5 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -27,6 +27,7 @@ import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.Driver;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.compute.operator.DriverRunner;
 import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
 import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
@@ -705,6 +706,9 @@ public class CsvTests extends ESTestCase {
             }
         };
         listener = ActionListener.releaseAfter(listener, () -> Releasables.close(drivers));
-        runner.runToCompletion(drivers, listener.map(ignore -> new Result(physicalPlan.output(), collectedPages, List.of(), null)));
+        runner.runToCompletion(
+            drivers,
+            listener.map(ignore -> new Result(physicalPlan.output(), collectedPages, DriverCompletionInfo.EMPTY, null))
+        );
     }
 }

+ 3 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java

@@ -14,8 +14,8 @@ import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
 import org.elasticsearch.compute.operator.DriverProfile;
 import org.elasticsearch.compute.operator.DriverSleeps;
-import org.elasticsearch.compute.operator.DriverStatus;
 import org.elasticsearch.compute.operator.Operator;
+import org.elasticsearch.compute.operator.OperatorStatus;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 
 import java.util.List;
@@ -58,7 +58,7 @@ public class EsqlQueryResponseProfileTests extends AbstractWireSerializingTestCa
         );
     }
 
-    private DriverStatus.OperatorStatus randomOperatorStatus() {
+    private OperatorStatus randomOperatorStatus() {
         String name = randomAlphaOfLength(4);
         Operator.Status status = randomBoolean()
             ? null
@@ -68,6 +68,6 @@ public class EsqlQueryResponseProfileTests extends AbstractWireSerializingTestCa
                 randomNonNegativeLong(),
                 randomNonNegativeLong()
             );
-        return new DriverStatus.OperatorStatus(name, status);
+        return new OperatorStatus(name, status);
     }
 }

+ 286 - 78
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java

@@ -32,7 +32,7 @@ import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
 import org.elasticsearch.compute.operator.DriverProfile;
 import org.elasticsearch.compute.operator.DriverSleeps;
-import org.elasticsearch.compute.operator.DriverStatus;
+import org.elasticsearch.compute.operator.OperatorStatus;
 import org.elasticsearch.compute.test.TestBlockFactory;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasables;
@@ -42,6 +42,7 @@ import org.elasticsearch.geo.ShapeTestUtils;
 import org.elasticsearch.index.mapper.BlockLoader;
 import org.elasticsearch.rest.action.RestActions;
 import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.transport.RemoteClusterAware;
 import org.elasticsearch.xcontent.InstantiatingObjectParser;
 import org.elasticsearch.xcontent.ObjectParser;
@@ -130,7 +131,18 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             id = randomAlphaOfLengthBetween(1, 16);
             isRunning = randomBoolean();
         }
-        return new EsqlQueryResponse(columns, values, profile, columnar, id, isRunning, async, createExecutionInfo());
+        return new EsqlQueryResponse(
+            columns,
+            values,
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            profile,
+            columnar,
+            id,
+            isRunning,
+            async,
+            createExecutionInfo()
+        );
     }
 
     EsqlExecutionInfo createExecutionInfo() {
@@ -265,58 +277,41 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 allNull = false;
             }
         }
-        return switch (allNull ? between(0, 2) : between(0, 3)) {
+        List<ColumnInfoImpl> columns = instance.columns();
+        List<Page> pages = deepCopyOfPages(instance);
+        long documentsFound = instance.documentsFound();
+        long valuesLoaded = instance.valuesLoaded();
+        EsqlQueryResponse.Profile profile = instance.profile();
+        boolean columnar = instance.columnar();
+        boolean isAsync = instance.isAsync();
+        EsqlExecutionInfo executionInfo = instance.getExecutionInfo();
+        switch (allNull ? between(0, 4) : between(0, 5)) {
             case 0 -> {
                 int mutCol = between(0, instance.columns().size() - 1);
-                List<ColumnInfoImpl> cols = new ArrayList<>(instance.columns());
+                columns = new ArrayList<>(instance.columns());
                 // keep the type the same so the values are still valid but change the name
-                cols.set(
-                    mutCol,
-                    new ColumnInfoImpl(cols.get(mutCol).name() + "mut", cols.get(mutCol).type(), cols.get(mutCol).originalTypes())
-                );
-                yield new EsqlQueryResponse(
-                    cols,
-                    deepCopyOfPages(instance),
-                    instance.profile(),
-                    instance.columnar(),
-                    instance.isAsync(),
-                    instance.getExecutionInfo()
-                );
+                ColumnInfoImpl mut = columns.get(mutCol);
+                columns.set(mutCol, new ColumnInfoImpl(mut.name() + "mut", mut.type(), mut.originalTypes()));
             }
-            case 1 -> new EsqlQueryResponse(
-                instance.columns(),
-                deepCopyOfPages(instance),
-                instance.profile(),
-                false == instance.columnar(),
-                instance.isAsync(),
-                instance.getExecutionInfo()
-            );
-            case 2 -> new EsqlQueryResponse(
-                instance.columns(),
-                deepCopyOfPages(instance),
-                randomValueOtherThan(instance.profile(), this::randomProfile),
-                instance.columnar(),
-                instance.isAsync(),
-                instance.getExecutionInfo()
-            );
-            case 3 -> {
+            case 1 -> documentsFound = randomValueOtherThan(documentsFound, ESTestCase::randomNonNegativeLong);
+            case 2 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong);
+            case 3 -> columnar = false == columnar;
+            case 4 -> profile = randomValueOtherThan(profile, this::randomProfile);
+            case 5 -> {
+                assert allNull == false
+                    : "can't replace values while preserving types if all pages are null - the only valid values are null";
                 int noPages = instance.pages().size();
                 List<Page> differentPages = List.of();
                 do {
                     differentPages.forEach(p -> Releasables.closeExpectNoException(p::releaseBlocks));
                     differentPages = randomList(noPages, noPages, () -> randomPage(instance.columns()));
                 } while (differentPages.equals(instance.pages()));
-                yield new EsqlQueryResponse(
-                    instance.columns(),
-                    differentPages,
-                    instance.profile(),
-                    instance.columnar(),
-                    instance.isAsync(),
-                    instance.getExecutionInfo()
-                );
+                pages.forEach(Page::releaseBlocks);
+                pages = differentPages;
             }
             default -> throw new IllegalArgumentException();
-        };
+        }
+        return new EsqlQueryResponse(columns, pages, documentsFound, valuesLoaded, profile, columnar, isAsync, executionInfo);
     }
 
     private List<Page> deepCopyOfPages(EsqlQueryResponse response) {
@@ -368,6 +363,8 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 ObjectParser.ValueType.BOOLEAN_OR_NULL
             );
             parser.declareInt(constructorArg(), new ParseField("took"));
+            parser.declareLong(constructorArg(), new ParseField("documents_found"));
+            parser.declareLong(constructorArg(), new ParseField("values_loaded"));
             parser.declareObjectArray(constructorArg(), (p, c) -> ColumnInfoImpl.fromXContent(p), new ParseField("columns"));
             parser.declareField(constructorArg(), (p, c) -> p.list(), new ParseField("values"), ObjectParser.ValueType.OBJECT_ARRAY);
             parser.declareObject(optionalConstructorArg(), (p, c) -> parseClusters(p), new ParseField("_clusters"));
@@ -382,6 +379,8 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             @Nullable String asyncExecutionId,
             Boolean isRunning,
             Integer took,
+            long documentsFound,
+            long valuesLoaded,
             List<ColumnInfoImpl> columns,
             List<List<Object>> values,
             EsqlExecutionInfo executionInfo
@@ -390,6 +389,8 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             this.response = new EsqlQueryResponse(
                 columns,
                 List.of(valuesToPage(TestBlockFactory.getNonBreakingInstance(), columns, values)),
+                documentsFound,
+                valuesLoaded,
                 null,
                 false,
                 asyncExecutionId,
@@ -584,62 +585,154 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         try (EsqlQueryResponse resp = randomResponse(true, null)) {
             int columnCount = resp.pages().get(0).getBlockCount();
             int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
-            assertChunkCount(resp, r -> 6 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize);
+            assertChunkCount(resp, r -> 8 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize);
         }
 
         try (EsqlQueryResponse resp = randomResponseAsync(true, null, true)) {
             int columnCount = resp.pages().get(0).getBlockCount();
             int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
-            assertChunkCount(resp, r -> 8 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); // is_running
+            assertChunkCount(resp, r -> 10 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); // is_running
         }
     }
 
     public void testChunkResponseSizeRows() {
         try (EsqlQueryResponse resp = randomResponse(false, null)) {
             int bodySize = resp.pages().stream().mapToInt(Page::getPositionCount).sum();
-            assertChunkCount(resp, r -> 6 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize);
+            assertChunkCount(resp, r -> 8 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize);
         }
         try (EsqlQueryResponse resp = randomResponseAsync(false, null, true)) {
             int bodySize = resp.pages().stream().mapToInt(Page::getPositionCount).sum();
-            assertChunkCount(resp, r -> 8 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize);
+            assertChunkCount(resp, r -> 10 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize);
         }
     }
 
     public void testSimpleXContentColumnar() {
         try (EsqlQueryResponse response = simple(true)) {
-            assertThat(Strings.toString(wrapAsToXContent(response)), equalTo("""
-                {"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}"""));
+            assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo("""
+                {
+                  "documents_found" : 3,
+                  "values_loaded" : 100,
+                  "columns" : [
+                    {
+                      "name" : "foo",
+                      "type" : "integer"
+                    }
+                  ],
+                  "values" : [
+                    [
+                      40,
+                      80
+                    ]
+                  ]
+                }"""));
         }
     }
 
     public void testSimpleXContentColumnarDropNulls() {
         try (EsqlQueryResponse response = simple(true)) {
             assertThat(
-                Strings.toString(wrapAsToXContent(response), new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true"))),
+                Strings.toString(
+                    wrapAsToXContent(response),
+                    new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true")),
+                    true,
+                    false
+                ),
                 equalTo("""
-                    {"all_columns":[{"name":"foo","type":"integer"}],"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}""")
+                    {
+                      "documents_found" : 3,
+                      "values_loaded" : 100,
+                      "all_columns" : [
+                        {
+                          "name" : "foo",
+                          "type" : "integer"
+                        }
+                      ],
+                      "columns" : [
+                        {
+                          "name" : "foo",
+                          "type" : "integer"
+                        }
+                      ],
+                      "values" : [
+                        [
+                          40,
+                          80
+                        ]
+                      ]
+                    }""")
             );
         }
     }
 
     public void testSimpleXContentColumnarAsync() {
         try (EsqlQueryResponse response = simple(true, true)) {
-            assertThat(Strings.toString(wrapAsToXContent(response)), equalTo("""
-                {"is_running":false,"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}"""));
+            assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo("""
+                {
+                  "is_running" : false,
+                  "documents_found" : 3,
+                  "values_loaded" : 100,
+                  "columns" : [
+                    {
+                      "name" : "foo",
+                      "type" : "integer"
+                    }
+                  ],
+                  "values" : [
+                    [
+                      40,
+                      80
+                    ]
+                  ]
+                }"""));
         }
     }
 
     public void testSimpleXContentRows() {
         try (EsqlQueryResponse response = simple(false)) {
-            assertThat(Strings.toString(wrapAsToXContent(response)), equalTo("""
-                {"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}"""));
+            assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo("""
+                {
+                  "documents_found" : 3,
+                  "values_loaded" : 100,
+                  "columns" : [
+                    {
+                      "name" : "foo",
+                      "type" : "integer"
+                    }
+                  ],
+                  "values" : [
+                    [
+                      40
+                    ],
+                    [
+                      80
+                    ]
+                  ]
+                }"""));
         }
     }
 
     public void testSimpleXContentRowsAsync() {
         try (EsqlQueryResponse response = simple(false, true)) {
-            assertThat(Strings.toString(wrapAsToXContent(response)), equalTo("""
-                {"is_running":false,"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}"""));
+            assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo("""
+                {
+                  "is_running" : false,
+                  "documents_found" : 3,
+                  "values_loaded" : 100,
+                  "columns" : [
+                    {
+                      "name" : "foo",
+                      "type" : "integer"
+                    }
+                  ],
+                  "values" : [
+                    [
+                      40
+                    ],
+                    [
+                      80
+                    ]
+                  ]
+                }"""));
         }
     }
 
@@ -648,6 +741,8 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             EsqlQueryResponse response = new EsqlQueryResponse(
                 List.of(new ColumnInfoImpl("foo", "integer", null)),
                 List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())),
+                10,
+                99,
                 null,
                 false,
                 "id-123",
@@ -656,8 +751,27 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 null
             )
         ) {
-            assertThat(Strings.toString(response), equalTo("""
-                {"id":"id-123","is_running":true,"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}"""));
+            assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo("""
+                {
+                  "id" : "id-123",
+                  "is_running" : true,
+                  "documents_found" : 10,
+                  "values_loaded" : 99,
+                  "columns" : [
+                    {
+                      "name" : "foo",
+                      "type" : "integer"
+                    }
+                  ],
+                  "values" : [
+                    [
+                      40
+                    ],
+                    [
+                      80
+                    ]
+                  ]
+                }"""));
         }
     }
 
@@ -666,6 +780,8 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             EsqlQueryResponse response = new EsqlQueryResponse(
                 List.of(new ColumnInfoImpl("foo", "unsupported", List.of("foo", "bar"))),
                 List.of(new Page(blockFactory.newConstantNullBlock(2))),
+                1,
+                1,
                 null,
                 false,
                 null,
@@ -674,8 +790,29 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 null
             )
         ) {
-            assertThat(Strings.toString(response), equalTo("""
-                {"columns":[{"name":"foo","type":"unsupported","original_types":["foo","bar"]}],"values":[[null],[null]]}"""));
+            assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo("""
+                {
+                  "documents_found" : 1,
+                  "values_loaded" : 1,
+                  "columns" : [
+                    {
+                      "name" : "foo",
+                      "type" : "unsupported",
+                      "original_types" : [
+                        "foo",
+                        "bar"
+                      ]
+                    }
+                  ],
+                  "values" : [
+                    [
+                      null
+                    ],
+                    [
+                      null
+                    ]
+                  ]
+                }"""));
         }
     }
 
@@ -684,6 +821,8 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             EsqlQueryResponse response = new EsqlQueryResponse(
                 List.of(new ColumnInfoImpl("foo", "integer", null), new ColumnInfoImpl("all_null", "integer", null)),
                 List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock(), blockFactory.newConstantNullBlock(2))),
+                1,
+                3,
                 null,
                 false,
                 null,
@@ -693,11 +832,41 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             )
         ) {
             assertThat(
-                Strings.toString(wrapAsToXContent(response), new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true"))),
-                equalTo("{" + """
-                    "all_columns":[{"name":"foo","type":"integer"},{"name":"all_null","type":"integer"}],""" + """
-                    "columns":[{"name":"foo","type":"integer"}],""" + """
-                    "values":[[40],[80]]}""")
+                Strings.toString(
+                    wrapAsToXContent(response),
+                    new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true")),
+                    true,
+                    false
+                ),
+                equalTo("""
+                    {
+                      "documents_found" : 1,
+                      "values_loaded" : 3,
+                      "all_columns" : [
+                        {
+                          "name" : "foo",
+                          "type" : "integer"
+                        },
+                        {
+                          "name" : "all_null",
+                          "type" : "integer"
+                        }
+                      ],
+                      "columns" : [
+                        {
+                          "name" : "foo",
+                          "type" : "integer"
+                        }
+                      ],
+                      "values" : [
+                        [
+                          40
+                        ],
+                        [
+                          80
+                        ]
+                      ]
+                    }""")
             );
         }
     }
@@ -714,6 +883,8 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 EsqlQueryResponse response = new EsqlQueryResponse(
                     List.of(new ColumnInfoImpl("foo", "integer", null), new ColumnInfoImpl("all_null", "integer", null)),
                     List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock(), b.build())),
+                    1,
+                    3,
                     null,
                     false,
                     null,
@@ -723,11 +894,41 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 )
             ) {
                 assertThat(
-                    Strings.toString(wrapAsToXContent(response), new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true"))),
-                    equalTo("{" + """
-                        "all_columns":[{"name":"foo","type":"integer"},{"name":"all_null","type":"integer"}],""" + """
-                        "columns":[{"name":"foo","type":"integer"}],""" + """
-                        "values":[[40],[80]]}""")
+                    Strings.toString(
+                        wrapAsToXContent(response),
+                        new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true")),
+                        true,
+                        false
+                    ),
+                    equalTo("""
+                        {
+                          "documents_found" : 1,
+                          "values_loaded" : 3,
+                          "all_columns" : [
+                            {
+                              "name" : "foo",
+                              "type" : "integer"
+                            },
+                            {
+                              "name" : "all_null",
+                              "type" : "integer"
+                            }
+                          ],
+                          "columns" : [
+                            {
+                              "name" : "foo",
+                              "type" : "integer"
+                            }
+                          ],
+                          "values" : [
+                            [
+                              40
+                            ],
+                            [
+                              80
+                            ]
+                          ]
+                        }""")
                 );
             }
         }
@@ -741,6 +942,8 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         return new EsqlQueryResponse(
             List.of(new ColumnInfoImpl("foo", "integer", null)),
             List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())),
+            3,
+            100,
             null,
             columnar,
             async,
@@ -753,6 +956,8 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             EsqlQueryResponse response = new EsqlQueryResponse(
                 List.of(new ColumnInfoImpl("foo", "integer", null)),
                 List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())),
+                10,
+                100,
                 new EsqlQueryResponse.Profile(
                     List.of(
                         new DriverProfile(
@@ -762,7 +967,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                             20021,
                             20000,
                             12,
-                            List.of(new DriverStatus.OperatorStatus("asdf", new AbstractPageMappingOperator.Status(10021, 10, 111, 222))),
+                            List.of(new OperatorStatus("asdf", new AbstractPageMappingOperator.Status(10021, 10, 111, 222))),
                             DriverSleeps.empty()
                         )
                     )
@@ -772,8 +977,10 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 null
             );
         ) {
-            assertThat(Strings.toString(response, true, false), equalTo("""
+            assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo("""
                 {
+                  "documents_found" : 10,
+                  "values_loaded" : 100,
                   "columns" : [
                     {
                       "name" : "foo",
@@ -796,6 +1003,8 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                         "stop_millis" : 1723489819929,
                         "took_nanos" : 20021,
                         "cpu_nanos" : 20000,
+                        "documents_found" : 0,
+                        "values_loaded" : 0,
                         "iterations" : 12,
                         "operators" : [
                           {
@@ -833,7 +1042,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         var longBlk2 = blockFactory.newLongArrayVector(new long[] { 300L, 400L, 500L }, 3).asBlock();
         var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null), new ColumnInfoImpl("bar", "long", null));
         var pages = List.of(new Page(intBlk1, longBlk1), new Page(intBlk2, longBlk2));
-        try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) {
+        try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) {
             assertThat(columnValues(response.column(0)), contains(10, 20, 30, 40, 50));
             assertThat(columnValues(response.column(1)), contains(100L, 200L, 300L, 400L, 500L));
             expectThrows(IllegalArgumentException.class, () -> response.column(-1));
@@ -845,7 +1054,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         var intBlk1 = blockFactory.newIntArrayVector(new int[] { 10 }, 1).asBlock();
         var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null));
         var pages = List.of(new Page(intBlk1));
-        try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) {
+        try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) {
             expectThrows(IllegalArgumentException.class, () -> response.column(-1));
             expectThrows(IllegalArgumentException.class, () -> response.column(1));
         }
@@ -864,7 +1073,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         }
         var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null));
         var pages = List.of(new Page(blk1), new Page(blk2), new Page(blk3));
-        try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) {
+        try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) {
             assertThat(columnValues(response.column(0)), contains(10, null, 30, null, null, 60, null, 80, 90, null));
             expectThrows(IllegalArgumentException.class, () -> response.column(-1));
             expectThrows(IllegalArgumentException.class, () -> response.column(2));
@@ -884,7 +1093,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         }
         var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null));
         var pages = List.of(new Page(blk1), new Page(blk2), new Page(blk3));
-        try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) {
+        try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) {
             assertThat(columnValues(response.column(0)), contains(List.of(10, 20), null, List.of(40, 50), null, 70, 80, null));
             expectThrows(IllegalArgumentException.class, () -> response.column(-1));
             expectThrows(IllegalArgumentException.class, () -> response.column(2));
@@ -897,7 +1106,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             List<ColumnInfoImpl> columns = randomList(numColumns, numColumns, this::randomColumnInfo);
             int noPages = randomIntBetween(1, 20);
             List<Page> pages = randomList(noPages, noPages, () -> randomPage(columns));
-            try (var resp = new EsqlQueryResponse(columns, pages, null, false, "", false, false, null)) {
+            try (var resp = new EsqlQueryResponse(columns, pages, 0, 0, null, false, "", false, false, null)) {
                 var rowValues = getValuesList(resp.rows());
                 var valValues = getValuesList(resp.values());
                 for (int i = 0; i < rowValues.size(); i++) {
@@ -1011,5 +1220,4 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         }
         return new Page(results.stream().map(Block.Builder::build).toArray(Block[]::new));
     }
-
 }

+ 13 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java

@@ -246,7 +246,7 @@ public class TextFormatTests extends ESTestCase {
     public void testPlainTextEmptyCursorWithoutColumns() {
         assertEquals(
             StringUtils.EMPTY,
-            getTextBodyContent(PLAIN_TEXT.format(req(), new EsqlQueryResponse(emptyList(), emptyList(), null, false, false, null)))
+            getTextBodyContent(PLAIN_TEXT.format(req(), new EsqlQueryResponse(emptyList(), emptyList(), 0, 0, null, false, false, null)))
         );
     }
 
@@ -269,7 +269,16 @@ public class TextFormatTests extends ESTestCase {
     }
 
     private static EsqlQueryResponse emptyData() {
-        return new EsqlQueryResponse(singletonList(new ColumnInfoImpl("name", "keyword", null)), emptyList(), null, false, false, null);
+        return new EsqlQueryResponse(
+            singletonList(new ColumnInfoImpl("name", "keyword", null)),
+            emptyList(),
+            0,
+            0,
+            null,
+            false,
+            false,
+            null
+        );
     }
 
     private static EsqlQueryResponse regularData() {
@@ -303,7 +312,7 @@ public class TextFormatTests extends ESTestCase {
             )
         );
 
-        return new EsqlQueryResponse(headers, values, null, false, false, null);
+        return new EsqlQueryResponse(headers, values, 0, 0, null, false, false, null);
     }
 
     private static EsqlQueryResponse escapedData() {
@@ -327,7 +336,7 @@ public class TextFormatTests extends ESTestCase {
             )
         );
 
-        return new EsqlQueryResponse(headers, values, null, false, false, null);
+        return new EsqlQueryResponse(headers, values, 0, 0, null, false, false, null);
     }
 
     private static RestRequest req() {

+ 6 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java

@@ -79,6 +79,8 @@ public class TextFormatterTests extends ESTestCase {
                 blockFactory.newConstantNullBlock(2)
             )
         ),
+        0,
+        0,
         null,
         randomBoolean(),
         randomBoolean(),
@@ -181,6 +183,8 @@ public class TextFormatterTests extends ESTestCase {
                     blockFactory.newConstantNullBlock(2)
                 )
             ),
+            0,
+            0,
             null,
             randomBoolean(),
             randomBoolean(),
@@ -222,6 +226,8 @@ public class TextFormatterTests extends ESTestCase {
                                     .build()
                             )
                         ),
+                        0,
+                        0,
                         null,
                         randomBoolean(),
                         randomBoolean(),

+ 33 - 19
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java

@@ -15,6 +15,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.compute.operator.DriverProfile;
 import org.elasticsearch.compute.operator.DriverSleeps;
 import org.elasticsearch.core.TimeValue;
@@ -36,6 +37,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -58,7 +60,7 @@ public class ComputeListenerTests extends ESTestCase {
         terminate(threadPool);
     }
 
-    private List<DriverProfile> randomProfiles() {
+    private DriverCompletionInfo randomCompletionInfo() {
         int numProfiles = randomIntBetween(0, 2);
         List<DriverProfile> profiles = new ArrayList<>(numProfiles);
         for (int i = 0; i < numProfiles; i++) {
@@ -75,20 +77,22 @@ public class ComputeListenerTests extends ESTestCase {
                 )
             );
         }
-        return profiles;
+        return new DriverCompletionInfo(randomNonNegativeLong(), randomNonNegativeLong(), profiles);
     }
 
     public void testEmpty() {
-        PlainActionFuture<List<DriverProfile>> results = new PlainActionFuture<>();
+        PlainActionFuture<DriverCompletionInfo> results = new PlainActionFuture<>();
         try (var ignored = new ComputeListener(threadPool, () -> {}, results)) {
             assertFalse(results.isDone());
         }
         assertTrue(results.isDone());
-        assertThat(results.actionGet(10, TimeUnit.SECONDS), empty());
+        assertThat(results.actionGet(10, TimeUnit.SECONDS).collectedProfiles(), empty());
     }
 
     public void testCollectComputeResults() {
-        PlainActionFuture<List<DriverProfile>> future = new PlainActionFuture<>();
+        PlainActionFuture<DriverCompletionInfo> future = new PlainActionFuture<>();
+        long documentsFound = 0;
+        long valuesLoaded = 0;
         List<DriverProfile> allProfiles = new ArrayList<>();
         AtomicInteger onFailure = new AtomicInteger();
         try (var computeListener = new ComputeListener(threadPool, onFailure::incrementAndGet, future)) {
@@ -102,20 +106,24 @@ public class ComputeListenerTests extends ESTestCase {
                         threadPool.generic()
                     );
                 } else {
-                    var profiles = randomProfiles();
-                    allProfiles.addAll(profiles);
-                    ActionListener<List<DriverProfile>> subListener = computeListener.acquireCompute();
+                    var info = randomCompletionInfo();
+                    documentsFound += info.documentsFound();
+                    valuesLoaded += info.valuesLoaded();
+                    allProfiles.addAll(info.collectedProfiles());
+                    ActionListener<DriverCompletionInfo> subListener = computeListener.acquireCompute();
                     threadPool.schedule(
-                        ActionRunnable.wrap(subListener, l -> l.onResponse(profiles)),
+                        ActionRunnable.wrap(subListener, l -> l.onResponse(info)),
                         TimeValue.timeValueNanos(between(0, 100)),
                         threadPool.generic()
                     );
                 }
             }
         }
-        List<DriverProfile> profiles = future.actionGet(10, TimeUnit.SECONDS);
+        DriverCompletionInfo actual = future.actionGet(10, TimeUnit.SECONDS);
+        assertThat(actual.documentsFound(), equalTo(documentsFound));
+        assertThat(actual.valuesLoaded(), equalTo(valuesLoaded));
         assertThat(
-            profiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
+            actual.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
             equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)))
         );
         assertThat(onFailure.get(), equalTo(0));
@@ -129,13 +137,13 @@ public class ComputeListenerTests extends ESTestCase {
             );
         int successTasks = between(1, 50);
         int failedTasks = between(1, 100);
-        PlainActionFuture<List<DriverProfile>> rootListener = new PlainActionFuture<>();
+        PlainActionFuture<DriverCompletionInfo> rootListener = new PlainActionFuture<>();
         final AtomicInteger onFailure = new AtomicInteger();
         try (var computeListener = new ComputeListener(threadPool, onFailure::incrementAndGet, rootListener)) {
             for (int i = 0; i < successTasks; i++) {
-                ActionListener<List<DriverProfile>> subListener = computeListener.acquireCompute();
+                ActionListener<DriverCompletionInfo> subListener = computeListener.acquireCompute();
                 threadPool.schedule(
-                    ActionRunnable.wrap(subListener, l -> l.onResponse(randomProfiles())),
+                    ActionRunnable.wrap(subListener, l -> l.onResponse(randomCompletionInfo())),
                     TimeValue.timeValueNanos(between(0, 100)),
                     threadPool.generic()
                 );
@@ -160,13 +168,17 @@ public class ComputeListenerTests extends ESTestCase {
     }
 
     public void testCollectWarnings() throws Exception {
+        AtomicLong documentsFound = new AtomicLong();
+        AtomicLong valuesLoaded = new AtomicLong();
         List<DriverProfile> allProfiles = new ArrayList<>();
         Map<String, Set<String>> allWarnings = new HashMap<>();
-        ActionListener<List<DriverProfile>> rootListener = new ActionListener<>() {
+        ActionListener<DriverCompletionInfo> rootListener = new ActionListener<>() {
             @Override
-            public void onResponse(List<DriverProfile> result) {
+            public void onResponse(DriverCompletionInfo result) {
+                assertThat(result.documentsFound(), equalTo(documentsFound.get()));
+                assertThat(result.valuesLoaded(), equalTo(valuesLoaded.get()));
                 assertThat(
-                    result.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
+                    result.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
                     equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)))
                 );
                 Map<String, Set<String>> responseHeaders = threadPool.getThreadContext()
@@ -201,8 +213,10 @@ public class ComputeListenerTests extends ESTestCase {
                         threadPool.generic()
                     );
                 } else {
-                    var resp = randomProfiles();
-                    allProfiles.addAll(resp);
+                    var resp = randomCompletionInfo();
+                    documentsFound.addAndGet(resp.documentsFound());
+                    valuesLoaded.addAndGet(resp.valuesLoaded());
+                    allProfiles.addAll(resp.collectedProfiles());
                     int numWarnings = randomIntBetween(1, 5);
                     Map<String, String> warnings = new HashMap<>();
                     for (int i = 0; i < numWarnings; i++) {

+ 31 - 24
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.compute.test.ComputeTestCase;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
@@ -123,7 +124,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
         Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
         var future = sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> {
             sent.add(new NodeRequest(node, shardIds, aliasFilters));
-            runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
+            runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())));
         });
         safeGet(future);
         assertThat(sent.size(), equalTo(2));
@@ -142,7 +143,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3));
             var future = sendRequests(true, -1, targetShards, (node, shardIds, aliasFilters, listener) -> {
                 assertThat(shard3, not(in(shardIds)));
-                runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
+                runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())));
             });
             ComputeResponse resp = safeGet(future);
             assertThat(resp.totalShards, equalTo(3));
@@ -173,7 +174,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             if (node.equals(node4) && shardIds.contains(shard2)) {
                 failures.put(shard2, new IOException("test"));
             }
-            runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), failures)));
+            runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, failures)));
         });
         try {
             future.actionGet(1, TimeUnit.MINUTES);
@@ -202,7 +203,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             if (shardIds.contains(shard5)) {
                 failures.put(shard5, new IOException("test failure for shard5"));
             }
-            runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), failures)));
+            runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, failures)));
         });
         var error = expectThrows(Exception.class, future::actionGet);
         assertNotNull(ExceptionsHelper.unwrap(error, IOException.class));
@@ -227,7 +228,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             if (node1.equals(node) && failed.compareAndSet(false, true)) {
                 runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true));
             } else {
-                runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
+                runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())));
             }
         });
         Exception exception = expectThrows(Exception.class, future::actionGet);
@@ -247,7 +248,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             if (node1.equals(node) && failed.compareAndSet(false, true)) {
                 runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true));
             } else {
-                runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
+                runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())));
             }
         });
         ComputeResponse resp = safeGet(future);
@@ -268,7 +269,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             if (Objects.equals(node1, node)) {
                 runWithDelay(() -> listener.onFailure(new RuntimeException("test request level non fatal failure"), false));
             } else {
-                runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
+                runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())));
             }
         }));
         assertThat(response.totalShards, equalTo(1));
@@ -326,7 +327,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             sent.add(new NodeRequest(node, shardIds, aliasFilters));
             runWithDelay(() -> {
                 concurrentRequests.decrementAndGet();
-                listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
+                listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
             });
         }));
         assertThat(sent.size(), equalTo(5));
@@ -349,7 +350,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
         var response = safeGet(sendRequests(randomBoolean(), 1, targetShards, (node, shardIds, aliasFilters, listener) -> {
             runWithDelay(() -> {
                 if (processed.incrementAndGet() == 1) {
-                    listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
+                    listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
                 } else {
                     listener.onSkip();
                 }
@@ -371,7 +372,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
                 if (Objects.equals(node.getId(), node1.getId()) && shardIds.equals(List.of(shard1))) {
                     listener.onFailure(new RuntimeException("test request level non fatal failure"), false);
                 } else if (Objects.equals(node.getId(), node3.getId()) && shardIds.equals(List.of(shard2))) {
-                    listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
+                    listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
                 } else if (Objects.equals(node.getId(), node2.getId()) && shardIds.equals(List.of(shard1))) {
                     listener.onSkip();
                 }
@@ -396,7 +397,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
         var sent = Collections.synchronizedList(new ArrayList<String>());
         safeGet(sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> {
             sent.add(node.getId());
-            runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
+            runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())));
         }));
         assertThat(sent, equalTo(List.of("node-1", "node-2", "node-3", "node-4")));
     }
@@ -409,7 +410,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
         var sent = ConcurrentCollections.<NodeRequest>newQueue();
         safeGet(sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> {
             sent.add(new NodeRequest(node, shardIds, aliasFilters));
-            runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
+            runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())));
         }));
         assertThat(groupRequests(sent, 1), equalTo(Map.of(node1, List.of(shard1))));
         assertThat(groupRequests(sent, 1), anyOf(equalTo(Map.of(node2, List.of(shard2))), equalTo(Map.of(warmNode2, List.of(shard2)))));
@@ -426,8 +427,8 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
                 (node, shardIds, aliasFilters, listener) -> runWithDelay(
                     () -> listener.onResponse(
                         Objects.equals(node, node4)
-                            ? new DataNodeComputeResponse(List.of(), Map.of())
-                            : new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1)))
+                            ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())
+                            : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
                     )
                 )
             )
@@ -451,10 +452,10 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
                     () -> listener.onResponse(
                         attempt.incrementAndGet() <= 6
                             ? new DataNodeComputeResponse(
-                                List.of(),
+                                DriverCompletionInfo.EMPTY,
                                 shardIds.stream().collect(toMap(Function.identity(), ShardNotFoundException::new))
                             )
-                            : new DataNodeComputeResponse(List.of(), Map.of())
+                            : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())
                     )
                 )
             )
@@ -472,7 +473,9 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             return Map.of(shard1, List.of(node2));
         },
             (node, shardIds, aliasFilters, listener) -> runWithDelay(
-                () -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1))))
+                () -> listener.onResponse(
+                    new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
+                )
             )
         ));
         assertThat(response.totalShards, equalTo(1));
@@ -493,12 +496,16 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             }, (node, shardIds, aliasFilters, listener) -> runWithDelay(() -> {
                 if (Objects.equals(node, node1)) {
                     // search is going to be retried from replica on node3 without shard resolution
-                    listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1))));
+                    listener.onResponse(
+                        new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
+                    );
                 } else if (Objects.equals(node, node2)) {
                     // search is going to be retried after resolving new shard node since there are no replicas
-                    listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of(shard2, new ShardNotFoundException(shard2))));
+                    listener.onResponse(
+                        new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard2, new ShardNotFoundException(shard2)))
+                    );
                 } else {
-                    listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
+                    listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
                 }
             }))
         );
@@ -519,8 +526,8 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             (node, shardIds, aliasFilters, listener) -> runWithDelay(
                 () -> listener.onResponse(
                     Objects.equals(shardIds, List.of(shard2))
-                        ? new DataNodeComputeResponse(List.of(), Map.of())
-                        : new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1)))
+                        ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())
+                        : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
                 )
             )
 
@@ -538,8 +545,8 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             (node, shardIds, aliasFilters, listener) -> runWithDelay(
                 () -> listener.onResponse(
                     Objects.equals(shardIds, List.of(shard2))
-                        ? new DataNodeComputeResponse(List.of(), Map.of())
-                        : new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1)))
+                        ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())
+                        : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
                 )
             )
         ));

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java

@@ -15,6 +15,7 @@ import org.elasticsearch.common.logging.ESLogMessage;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.SlowLogFieldProvider;
@@ -105,7 +106,7 @@ public class EsqlQueryLogTests extends ESTestCase {
 
         for (int i = 0; i < actualTook.length; i++) {
             EsqlExecutionInfo warnQuery = getEsqlExecutionInfo(actualTook[i], actualPlanningTook[i]);
-            queryLog.onQueryPhase(new Result(List.of(), List.of(), List.of(), warnQuery), query);
+            queryLog.onQueryPhase(new Result(List.of(), List.of(), DriverCompletionInfo.EMPTY, warnQuery), query);
             if (expectedLevel[i] != null) {
                 assertThat(appender.lastEvent(), is(not(nullValue())));
                 var msg = (ESLogMessage) appender.lastMessage();

+ 56 - 2
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml

@@ -169,7 +169,7 @@ setup:
   - match: {values.0.1: 40}
 
 ---
-"Basic ESQL query":
+basic:
   - do:
       esql.query:
         body:
@@ -181,12 +181,66 @@ setup:
   - match: {values.0: [1, 1]}
 
 ---
-"Test From Eval Sort Limit":
+basic with documents_found:
+  - requires:
+      test_runner_features: [capabilities, contains]
+      capabilities:
+        - method: POST
+          path: /_query
+          parameters: []
+          capabilities: [documents_found_and_values_loaded]
+      reason: "checks for documents_found and values_loaded"
+
+  - do:
+      esql.query:
+        body:
+          query: 'from test | keep data | sort data | limit 2'
+          columnar: true
+
+  - match: {documents_found: 10}   # two documents per shard
+  - match: {values_loaded: 10}     # one per document
+  - match: {columns.0.name: "data"}
+  - match: {columns.0.type: "long"}
+  - match: {values.0: [1, 1]}
+
+---
+FROM EVAL SORT LIMIT:
+  - do:
+      esql.query:
+        body:
+          query: 'from test | eval x = count + 7 | sort x | limit 1'
+
+  - match: {columns.0.name: "color"}
+  - match: {columns.1.name: "count"}
+  - match: {columns.2.name: "count_d"}
+  - match: {columns.3.name: "data"}
+  - match: {columns.4.name: "data_d"}
+  - match: {columns.5.name: "time"}
+  - match: {columns.6.name: "x"}
+  - match: {values.0.6: 47}
+  - length: {values: 1}
+
+---
+FROM EVAL SORT LIMIT with documents_found:
+  - requires:
+      test_runner_features: [capabilities, contains]
+      capabilities:
+        - method: POST
+          path: /_query
+          parameters: []
+          capabilities: [documents_found_and_values_loaded]
+      reason: "checks for documents_found and values_loaded"
+
   - do:
       esql.query:
         body:
           query: 'from test | eval x = count + 7 | sort x | limit 1'
 
+  - match: {documents_found: 40}
+  # We can't be sure quite how many values we'll load. It's at least
+  # one per document in the index. And one per top document. But we
+  # might load more values because we run in more threads.
+  - gte: {values_loaded: 45}
   - match: {columns.0.name: "color"}
   - match: {columns.1.name: "count"}
   - match: {columns.2.name: "count_d"}

+ 26 - 4
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml

@@ -1,9 +1,5 @@
 ---
 setup:
-  - requires:
-      cluster_features: ["gte_v8.12.0"]
-      reason: "profile option added in 8.12"
-      test_runner_features: warnings
   - do:
       indices.create:
         index:  test
@@ -140,3 +136,29 @@ avg 8.14 or after:
   - gte: {profile.drivers.1.took_nanos: 0}
   - gte: {profile.drivers.1.cpu_nanos: 0}
 # It's hard to assert much about these because they don't come back in any particular order.
+
+---
+documents found:
+  - requires:
+      test_runner_features: [capabilities, contains]
+      capabilities:
+        - method: POST
+          path: /_query
+          parameters: []
+          capabilities: [documents_found_and_values_loaded]
+      reason: "checks for documents_found and values_loaded"
+
+  - do:
+      esql.query:
+        body:
+          query: 'FROM test | LIMIT 1'
+          profile: true
+
+  - length: {profile.drivers: 3}
+  - match: {profile.drivers.0.operators.0.operator: /ExchangeSourceOperator|LuceneSourceOperator.+/}
+  - gte: {profile.drivers.0.documents_found: 0}
+  - gte: {profile.drivers.0.values_loaded: 0}
+  - gte: {profile.drivers.1.documents_found: 0}
+  - gte: {profile.drivers.1.values_loaded: 0}
+  - gte: {profile.drivers.2.documents_found: 0}
+  - gte: {profile.drivers.2.values_loaded: 0}