浏览代码

[ESQL] Add finish() elapsed time to aggregation profiling times (#113172) (#113378)

Closes https://github.com/elastic/elasticsearch/issues/112950?reload=1

Added the `finish()` time to the `AggregationOperator` profiling times,
as `finish()` here calls the aggregator evaluators, which may take a
considerable time.

Here, I added the time to the same count of nanos, but we could separate
them in 2 fields, as aggregator times differ a lot between adding input
and outputting the result. However, I believe that would require a new
transport version?

## Example times Example of how much time is lost in the sample query
`FROM test | STATS x=COUNT_DISTINCT(a)` **with the times separated in 2
variables**:

```JSON
{
    "operator": "AggregationOperator[aggregators=[Aggregator[aggregatorFunction=CountDistinctLongAggregatorFunction[channels=[1]], mode=INITIAL]]]",
    "status": {
        "aggregation_nanos": 571900,
        "aggregation_finish_nanos": 1484600,
        "pages_processed": 3
    }
}
```

Another, more obvious example, where there are no input pages

```JSON
{
    "operator": "AggregationOperator[aggregators=[Aggregator[aggregatorFunction=CountDistinctLongAggregatorFunction[channels=[1]], mode=INITIAL]]]",
    "status": {
        "aggregation_nanos": 0,
        "aggregation_finish_nanos": 48800,
        "pages_processed": 0
    }
}
```
Iván Cea Fontenla 1 年之前
父节点
当前提交
662bb70c03

+ 6 - 0
docs/changelog/113172.yaml

@@ -0,0 +1,6 @@
+pr: 113172
+summary: "[ESQL] Add finish() elapsed time to aggregation profiling times"
+area: ES|QL
+type: enhancement
+issues:
+ - 112950

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -220,6 +220,7 @@ public class TransportVersions {
     public static final TransportVersion ML_INFERENCE_IBM_WATSONX_EMBEDDINGS_ADDED = def(8_744_00_0);
     public static final TransportVersion BULK_INCREMENTAL_STATE = def(8_745_00_0);
     public static final TransportVersion FAILURE_STORE_STATUS_IN_INDEX_RESPONSE = def(8_746_00_0);
+    public static final TransportVersion ESQL_AGGREGATION_OPERATOR_STATUS_FINISH_NANOS = def(8_747_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 41 - 4
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java

@@ -50,6 +50,10 @@ public class AggregationOperator implements Operator {
      * Nanoseconds this operator has spent running the aggregations.
      */
     private long aggregationNanos;
+    /**
+     * Nanoseconds this operator has spent running the aggregations final evaluation.
+     */
+    private long aggregationFinishNanos;
     /**
      * Count of pages this operator has processed.
      */
@@ -117,6 +121,7 @@ public class AggregationOperator implements Operator {
         if (finished) {
             return;
         }
+        long start = System.nanoTime();
         finished = true;
         Block[] blocks = null;
         boolean success = false;
@@ -136,6 +141,7 @@ public class AggregationOperator implements Operator {
             if (success == false && blocks != null) {
                 Releasables.closeExpectNoException(blocks);
             }
+            aggregationFinishNanos += System.nanoTime() - start;
         }
     }
 
@@ -175,7 +181,7 @@ public class AggregationOperator implements Operator {
 
     @Override
     public Operator.Status status() {
-        return new Status(aggregationNanos, pagesProcessed);
+        return new Status(aggregationNanos, aggregationFinishNanos, pagesProcessed);
     }
 
     public static class Status implements Operator.Status {
@@ -189,6 +195,11 @@ public class AggregationOperator implements Operator {
          * Nanoseconds this operator has spent running the aggregations.
          */
         private final long aggregationNanos;
+
+        /**
+         * Nanoseconds this operator has spent running the aggregations final evaluation.
+         */
+        private final Long aggregationFinishNanos;
         /**
          * Count of pages this operator has processed.
          */
@@ -197,21 +208,31 @@ public class AggregationOperator implements Operator {
         /**
          * Build.
          * @param aggregationNanos Nanoseconds this operator has spent running the aggregations.
+         * @param aggregationFinishNanos Nanoseconds this operator has spent running the aggregations.
          * @param pagesProcessed Count of pages this operator has processed.
          */
-        public Status(long aggregationNanos, int pagesProcessed) {
+        public Status(long aggregationNanos, long aggregationFinishNanos, int pagesProcessed) {
             this.aggregationNanos = aggregationNanos;
+            this.aggregationFinishNanos = aggregationFinishNanos;
             this.pagesProcessed = pagesProcessed;
         }
 
         protected Status(StreamInput in) throws IOException {
             aggregationNanos = in.readVLong();
+            if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_AGGREGATION_OPERATOR_STATUS_FINISH_NANOS)) {
+                aggregationFinishNanos = in.readOptionalVLong();
+            } else {
+                aggregationFinishNanos = null;
+            }
             pagesProcessed = in.readVInt();
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeVLong(aggregationNanos);
+            if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_AGGREGATION_OPERATOR_STATUS_FINISH_NANOS)) {
+                out.writeOptionalVLong(aggregationFinishNanos);
+            }
             out.writeVInt(pagesProcessed);
         }
 
@@ -227,6 +248,13 @@ public class AggregationOperator implements Operator {
             return aggregationNanos;
         }
 
+        /**
+         * Nanoseconds this operator has spent running the aggregations final evaluation.
+         */
+        public long aggregationFinishNanos() {
+            return aggregationFinishNanos;
+        }
+
         /**
          * Count of pages this operator has processed.
          */
@@ -241,6 +269,13 @@ public class AggregationOperator implements Operator {
             if (builder.humanReadable()) {
                 builder.field("aggregation_time", TimeValue.timeValueNanos(aggregationNanos));
             }
+            builder.field("aggregation_finish_nanos", aggregationFinishNanos);
+            if (builder.humanReadable()) {
+                builder.field(
+                    "aggregation_finish_time",
+                    aggregationFinishNanos == null ? null : TimeValue.timeValueNanos(aggregationFinishNanos)
+                );
+            }
             builder.field("pages_processed", pagesProcessed);
             return builder.endObject();
 
@@ -251,12 +286,14 @@ public class AggregationOperator implements Operator {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             Status status = (Status) o;
-            return aggregationNanos == status.aggregationNanos && pagesProcessed == status.pagesProcessed;
+            return aggregationNanos == status.aggregationNanos
+                && pagesProcessed == status.pagesProcessed
+                && Objects.equals(aggregationFinishNanos, status.aggregationFinishNanos);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(aggregationNanos, pagesProcessed);
+            return Objects.hash(aggregationNanos, aggregationFinishNanos, pagesProcessed);
         }
 
         @Override

+ 9 - 5
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AggregationOperatorStatusTests.java

@@ -16,7 +16,7 @@ import static org.hamcrest.Matchers.equalTo;
 
 public class AggregationOperatorStatusTests extends AbstractWireSerializingTestCase<AggregationOperator.Status> {
     public static AggregationOperator.Status simple() {
-        return new AggregationOperator.Status(200012, 123);
+        return new AggregationOperator.Status(200012, 400036, 123);
     }
 
     public static String simpleToJson() {
@@ -24,6 +24,8 @@ public class AggregationOperatorStatusTests extends AbstractWireSerializingTestC
             {
               "aggregation_nanos" : 200012,
               "aggregation_time" : "200micros",
+              "aggregation_finish_nanos" : 400036,
+              "aggregation_finish_time" : "400micros",
               "pages_processed" : 123
             }""";
     }
@@ -39,18 +41,20 @@ public class AggregationOperatorStatusTests extends AbstractWireSerializingTestC
 
     @Override
     public AggregationOperator.Status createTestInstance() {
-        return new AggregationOperator.Status(randomNonNegativeLong(), randomNonNegativeInt());
+        return new AggregationOperator.Status(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeInt());
     }
 
     @Override
     protected AggregationOperator.Status mutateInstance(AggregationOperator.Status instance) {
         long aggregationNanos = instance.aggregationNanos();
+        long aggregationFinishNanos = instance.aggregationFinishNanos();
         int pagesProcessed = instance.pagesProcessed();
-        switch (between(0, 1)) {
+        switch (between(0, 2)) {
             case 0 -> aggregationNanos = randomValueOtherThan(aggregationNanos, ESTestCase::randomNonNegativeLong);
-            case 1 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
+            case 1 -> aggregationFinishNanos = randomValueOtherThan(aggregationFinishNanos, ESTestCase::randomNonNegativeLong);
+            case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
             default -> throw new UnsupportedOperationException();
         }
-        return new AggregationOperator.Status(aggregationNanos, pagesProcessed);
+        return new AggregationOperator.Status(aggregationNanos, aggregationFinishNanos, pagesProcessed);
     }
 }

+ 3 - 1
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

@@ -562,7 +562,9 @@ public class RestEsqlIT extends RestEsqlTestCase {
                 .entry("processing_nanos", greaterThan(0))
                 .entry("processed_queries", List.of("*:*"));
             case "ValuesSourceReaderOperator" -> basicProfile().entry("readers_built", matchesMap().extraOk());
-            case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0)).entry("aggregation_nanos", greaterThan(0));
+            case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0))
+                .entry("aggregation_nanos", greaterThan(0))
+                .entry("aggregation_finish_nanos", greaterThan(0));
             case "ExchangeSinkOperator" -> matchesMap().entry("pages_accepted", greaterThan(0));
             case "ExchangeSourceOperator" -> matchesMap().entry("pages_emitted", greaterThan(0)).entry("pages_waiting", 0);
             case "ProjectOperator", "EvalOperator" -> basicProfile();