Browse Source

Add status for enrich operator (#106036)

This PR adds a status for the enrich operator. This status 
should help us answer how fast the enrich operator is.
Nhat Nguyen 1 year ago
parent
commit
e971c51a45

+ 5 - 0
docs/changelog/106036.yaml

@@ -0,0 +1,5 @@
+pr: 106036
+summary: Add status for enrich operator
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -137,6 +137,7 @@ public class TransportVersions {
     public static final TransportVersion ESQL_TIMINGS = def(8_597_00_0);
     public static final TransportVersion DATA_STREAM_AUTO_SHARDING_EVENT = def(8_598_00_0);
     public static final TransportVersion ADD_FAILURE_STORE_INDICES_OPTIONS = def(8_599_00_0);
+    public static final TransportVersion ESQL_ENRICH_OPERATOR_STATUS = def(8_600_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 120 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

@@ -8,16 +8,27 @@
 package org.elasticsearch.compute.operator;
 
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.SubscribableListener;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.tasks.TaskCancelledException;
+import org.elasticsearch.xcontent.XContentBuilder;
 
+import java.io.IOException;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
 
 /**
  * {@link AsyncOperator} performs an external computation specified in {@link #performAsync(Page, ActionListener)}.
@@ -33,6 +44,7 @@ public abstract class AsyncOperator implements Operator {
     private final DriverContext driverContext;
 
     private final int maxOutstandingRequests;
+    private final LongAdder totalTimeInNanos = new LongAdder();
     private boolean finished = false;
     private volatile boolean closed = false;
 
@@ -81,7 +93,11 @@ public abstract class AsyncOperator implements Operator {
                 onFailure(e);
                 onSeqNoCompleted(seqNo);
             });
-            performAsync(input, ActionListener.runAfter(listener, driverContext::removeAsyncAction));
+            final long startNanos = System.nanoTime();
+            performAsync(input, ActionListener.runAfter(listener, () -> {
+                driverContext.removeAsyncAction();
+                totalTimeInNanos.add(System.nanoTime() - startNanos);
+            }));
             success = true;
         } finally {
             if (success == false) {
@@ -224,4 +240,107 @@ public abstract class AsyncOperator implements Operator {
             return blockedFuture;
         }
     }
+
+    @Override
+    public final Operator.Status status() {
+        return status(
+            Math.max(0L, checkpoint.getMaxSeqNo()),
+            Math.max(0L, checkpoint.getProcessedCheckpoint()),
+            TimeValue.timeValueNanos(totalTimeInNanos.sum()).millis()
+        );
+    }
+
+    protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
+        return new Status(receivedPages, completedPages, totalTimeInMillis);
+    }
+
+    public static class Status implements Operator.Status {
+        public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+            Operator.Status.class,
+            "async_operator",
+            Status::new
+        );
+
+        final long receivedPages;
+        final long completedPages;
+        final long totalTimeInMillis;
+
+        protected Status(long receivedPages, long completedPages, long totalTimeInMillis) {
+            this.receivedPages = receivedPages;
+            this.completedPages = completedPages;
+            this.totalTimeInMillis = totalTimeInMillis;
+        }
+
+        protected Status(StreamInput in) throws IOException {
+            this.receivedPages = in.readVLong();
+            this.completedPages = in.readVLong();
+            this.totalTimeInMillis = in.readVLong();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeVLong(receivedPages);
+            out.writeVLong(completedPages);
+            out.writeVLong(totalTimeInMillis);
+        }
+
+        public long receivedPages() {
+            return receivedPages;
+        }
+
+        public long completedPages() {
+            return completedPages;
+        }
+
+        public long totalTimeInMillis() {
+            return totalTimeInMillis;
+        }
+
+        @Override
+        public String getWriteableName() {
+            return ENTRY.name;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            innerToXContent(builder);
+            return builder.endObject();
+        }
+
+        protected final XContentBuilder innerToXContent(XContentBuilder builder) throws IOException {
+            builder.field("received_pages", receivedPages);
+            builder.field("completed_pages", completedPages);
+            builder.field("total_time_in_millis", totalTimeInMillis);
+            if (totalTimeInMillis >= 0) {
+                builder.field("total_time", TimeValue.timeValueMillis(totalTimeInMillis));
+            }
+            return builder;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Status status = (Status) o;
+            return receivedPages == status.receivedPages
+                && completedPages == status.completedPages
+                && totalTimeInMillis == status.totalTimeInMillis;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(receivedPages, completedPages, totalTimeInMillis);
+        }
+
+        @Override
+        public String toString() {
+            return Strings.toString(this);
+        }
+
+        @Override
+        public TransportVersion getMinimalSupportedVersion() {
+            return TransportVersions.ESQL_ENRICH_OPERATOR_STATUS;
+        }
+    }
 }

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java

@@ -102,7 +102,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
         return iterations;
     }
 
-    List<DriverStatus.OperatorStatus> operators() {
+    public List<DriverStatus.OperatorStatus> operators() {
         return operators;
     }
 

+ 69 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorStatusTests.java

@@ -0,0 +1,69 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class AsyncOperatorStatusTests extends AbstractWireSerializingTestCase<AsyncOperator.Status> {
+    @Override
+    protected Writeable.Reader<AsyncOperator.Status> instanceReader() {
+        return AsyncOperator.Status::new;
+    }
+
+    @Override
+    protected AsyncOperator.Status createTestInstance() {
+        return new AsyncOperator.Status(
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomLongBetween(1, TimeValue.timeValueHours(1).millis())
+        );
+    }
+
+    @Override
+    protected AsyncOperator.Status mutateInstance(AsyncOperator.Status in) throws IOException {
+        int field = randomIntBetween(0, 2);
+        return switch (field) {
+            case 0 -> new AsyncOperator.Status(
+                randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong),
+                in.completedPages(),
+                in.totalTimeInMillis()
+            );
+            case 1 -> new AsyncOperator.Status(
+                in.receivedPages(),
+                randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong),
+                in.totalTimeInMillis()
+            );
+            case 2 -> new AsyncOperator.Status(
+                in.receivedPages(),
+                in.completedPages(),
+                randomValueOtherThan(in.totalTimeInMillis(), ESTestCase::randomNonNegativeLong)
+            );
+            default -> throw new AssertionError("unknown ");
+        };
+    }
+
+    public void testToXContent() {
+        var status = new AsyncOperator.Status(100, 50, TimeValue.timeValueSeconds(10).millis());
+        String json = Strings.toString(status, true, true);
+        assertThat(json, equalTo("""
+            {
+              "received_pages" : 100,
+              "completed_pages" : 50,
+              "total_time_in_millis" : 10000,
+              "total_time" : "10s"
+            }"""));
+    }
+}

+ 3 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java

@@ -460,6 +460,9 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
         EsqlQueryRequest request = new EsqlQueryRequest();
         request.query(query);
         request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
+        if (randomBoolean()) {
+            request.profile(true);
+        }
         return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
     }
 

+ 29 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java

@@ -18,6 +18,8 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.compute.operator.DriverStatus;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
@@ -58,12 +60,15 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import static java.util.Collections.emptyList;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING;
 import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
 
 public class EnrichIT extends AbstractEsqlIntegTestCase {
 
@@ -121,6 +126,9 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {
         } else {
             client = client();
         }
+        if (request.profile() == false && randomBoolean()) {
+            request.profile(true);
+        }
         if (randomBoolean()) {
             setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 4096)));
             try {
@@ -318,6 +326,27 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {
         }
     }
 
+    public void testProfile() {
+        EsqlQueryRequest request = new EsqlQueryRequest();
+        request.pragmas(randomPragmas());
+        request.query("from listens* | sort timestamp DESC | limit 1 | " + enrichSongCommand() + " | KEEP timestamp, artist");
+        request.profile(true);
+        try (var resp = run(request)) {
+            Iterator<Object> row = resp.values().next();
+            assertThat(row.next(), equalTo(7L));
+            assertThat(row.next(), equalTo("Linkin Park"));
+            EsqlQueryResponse.Profile profile = resp.profile();
+            assertNotNull(profile);
+            List<DriverProfile> drivers = profile.drivers();
+            assertThat(drivers.size(), greaterThanOrEqualTo(2));
+            List<DriverStatus.OperatorStatus> enrichOperators = drivers.stream()
+                .flatMap(d -> d.operators().stream())
+                .filter(status -> status.operator().startsWith("EnrichOperator"))
+                .toList();
+            assertThat(enrichOperators, not(emptyList()));
+        }
+    }
+
     /**
      * Some enrich queries that could fail without the PushDownEnrich rule.
      */

+ 82 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java

@@ -8,15 +8,21 @@
 package org.elasticsearch.xpack.esql.enrich;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.AsyncOperator;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.ql.expression.NamedExpression;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 
 public final class EnrichLookupOperator extends AsyncOperator {
     private final EnrichLookupService enrichLookupService;
@@ -27,6 +33,7 @@ public final class EnrichLookupOperator extends AsyncOperator {
     private final String matchType;
     private final String matchField;
     private final List<NamedExpression> enrichFields;
+    private long totalTerms = 0L;
 
     public record Factory(
         String sessionId,
@@ -95,6 +102,7 @@ public final class EnrichLookupOperator extends AsyncOperator {
     @Override
     protected void performAsync(Page inputPage, ActionListener<Page> listener) {
         final Block inputBlock = inputPage.getBlock(inputChannel);
+        totalTerms += inputBlock.getTotalValueCount();
         enrichLookupService.lookupAsync(
             sessionId,
             parentTask,
@@ -107,9 +115,83 @@ public final class EnrichLookupOperator extends AsyncOperator {
         );
     }
 
+    @Override
+    public String toString() {
+        return "EnrichOperator[index="
+            + enrichIndex
+            + " match_field="
+            + matchField
+            + " enrich_fields="
+            + enrichFields
+            + " inputChannel="
+            + inputChannel
+            + "]";
+    }
+
     @Override
     protected void doClose() {
         // TODO: Maybe create a sub-task as the parent task of all the lookup tasks
         // then cancel it when this operator terminates early (e.g., have enough result).
     }
+
+    @Override
+    protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
+        return new EnrichLookupOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms);
+    }
+
+    public static class Status extends AsyncOperator.Status {
+        public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+            Operator.Status.class,
+            "enrich",
+            Status::new
+        );
+
+        final long totalTerms;
+
+        Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms) {
+            super(receivedPages, completedPages, totalTimeInMillis);
+            this.totalTerms = totalTerms;
+        }
+
+        Status(StreamInput in) throws IOException {
+            super(in);
+            this.totalTerms = in.readVLong();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeVLong(totalTerms);
+        }
+
+        @Override
+        public String getWriteableName() {
+            return ENTRY.name;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            innerToXContent(builder);
+            builder.field("total_terms", totalTerms);
+            return builder.endObject();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass() || super.equals(o) == false) {
+                return false;
+            }
+            Status status = (Status) o;
+            return totalTerms == status.totalTerms;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), totalTerms);
+        }
+    }
 }

+ 5 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -26,6 +26,7 @@ import org.elasticsearch.compute.lucene.LuceneOperator;
 import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
 import org.elasticsearch.compute.operator.AggregationOperator;
+import org.elasticsearch.compute.operator.AsyncOperator;
 import org.elasticsearch.compute.operator.DriverStatus;
 import org.elasticsearch.compute.operator.HashAggregationOperator;
 import org.elasticsearch.compute.operator.LimitOperator;
@@ -52,6 +53,7 @@ import org.elasticsearch.xpack.esql.action.RestEsqlAsyncQueryAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlDeleteAsyncResultAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlGetAsyncResultAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlQueryAction;
+import org.elasticsearch.xpack.esql.enrich.EnrichLookupOperator;
 import org.elasticsearch.xpack.esql.execution.PlanExecutor;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
 import org.elasticsearch.xpack.esql.session.EsqlIndexResolver;
@@ -176,7 +178,9 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
                 TopNOperatorStatus.ENTRY,
                 MvExpandOperator.Status.ENTRY,
                 ValuesSourceReaderOperator.Status.ENTRY,
-                SingleValueQuery.ENTRY
+                SingleValueQuery.ENTRY,
+                AsyncOperator.Status.ENTRY,
+                EnrichLookupOperator.Status.ENTRY
             ).stream(),
             Block.getNamedWriteables().stream()
         ).toList();

+ 80 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichOperatorStatusTests.java

@@ -0,0 +1,80 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.enrich;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class EnrichOperatorStatusTests extends AbstractWireSerializingTestCase<EnrichLookupOperator.Status> {
+    @Override
+    protected Writeable.Reader<EnrichLookupOperator.Status> instanceReader() {
+        return EnrichLookupOperator.Status::new;
+    }
+
+    @Override
+    protected EnrichLookupOperator.Status createTestInstance() {
+        return new EnrichLookupOperator.Status(
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomLongBetween(1, TimeValue.timeValueHours(1).millis())
+        );
+    }
+
+    @Override
+    protected EnrichLookupOperator.Status mutateInstance(EnrichLookupOperator.Status in) throws IOException {
+        int field = randomIntBetween(0, 3);
+        return switch (field) {
+            case 0 -> new EnrichLookupOperator.Status(
+                randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong),
+                in.completedPages(),
+                in.totalTerms,
+                in.totalTimeInMillis()
+            );
+            case 1 -> new EnrichLookupOperator.Status(
+                in.receivedPages(),
+                randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong),
+                in.totalTerms,
+                in.totalTimeInMillis()
+            );
+            case 2 -> new EnrichLookupOperator.Status(
+                in.receivedPages(),
+                in.completedPages(),
+                randomValueOtherThan(in.totalTerms, ESTestCase::randomNonNegativeLong),
+                in.totalTimeInMillis()
+            );
+            case 3 -> new EnrichLookupOperator.Status(
+                in.receivedPages(),
+                in.completedPages(),
+                in.totalTerms,
+                randomValueOtherThan(in.totalTimeInMillis(), ESTestCase::randomNonNegativeLong)
+            );
+            default -> throw new AssertionError("unknown ");
+        };
+    }
+
+    public void testToXContent() {
+        var status = new EnrichLookupOperator.Status(100, 50, TimeValue.timeValueSeconds(10).millis(), 120);
+        String json = Strings.toString(status, true, true);
+        assertThat(json, equalTo("""
+            {
+              "received_pages" : 100,
+              "completed_pages" : 50,
+              "total_time_in_millis" : 10000,
+              "total_time" : "10s",
+              "total_terms" : 120
+            }"""));
+    }
+}