浏览代码

[9.1] ESQL: Added Sample operator NamedWritable to plugin (#131541) (#131621)

Manual 9.1 backport of https://github.com/elastic/elasticsearch/pull/131541

There's a TransportVersion change here
Iván Cea Fontenla 2 月之前
父节点
当前提交
9d82c486d6

+ 5 - 0
docs/changelog/131541.yaml

@@ -0,0 +1,5 @@
+pr: 131541
+summary: Added Sample operator `NamedWritable` to plugin
+area: ES|QL
+type: bug
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -330,6 +330,7 @@ public class TransportVersions {
     public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
     public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_9_1 = def(9_112_0_01);
     public static final TransportVersion ESQL_FIXED_INDEX_LIKE_9_1 = def(9_112_0_02);
+    public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS_9_1 = def(9_112_0_03);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 21 - 9
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/SampleOperator.java

@@ -66,11 +66,11 @@ public class SampleOperator implements Operator {
     private final RandomSamplingQuery.RandomSamplingIterator randomSamplingIterator;
     private boolean finished;
 
-    private int pagesProcessed = 0;
-    private int rowsReceived = 0;
-    private int rowsEmitted = 0;
     private long collectNanos;
     private long emitNanos;
+    private int pagesProcessed = 0;
+    private long rowsReceived = 0;
+    private long rowsEmitted = 0;
 
     private SampleOperator(double probability, int seed) {
         finished = false;
@@ -109,7 +109,7 @@ public class SampleOperator implements Operator {
         final int[] sampledPositions = new int[page.getPositionCount()];
         int sampledIdx = 0;
         for (int i = randomSamplingIterator.docID(); i - rowsReceived < page.getPositionCount(); i = randomSamplingIterator.nextDoc()) {
-            sampledPositions[sampledIdx++] = i - rowsReceived;
+            sampledPositions[sampledIdx++] = Math.toIntExact(i - rowsReceived);
         }
         if (sampledIdx > 0) {
             outputPages.add(page.filter(Arrays.copyOf(sampledPositions, sampledIdx)));
@@ -167,7 +167,7 @@ public class SampleOperator implements Operator {
         return new Status(collectNanos, emitNanos, pagesProcessed, rowsReceived, rowsEmitted);
     }
 
-    private record Status(long collectNanos, long emitNanos, int pagesProcessed, int rowsReceived, int rowsEmitted)
+    public record Status(long collectNanos, long emitNanos, int pagesProcessed, long rowsReceived, long rowsEmitted)
         implements
             Operator.Status {
 
@@ -178,7 +178,13 @@ public class SampleOperator implements Operator {
         );
 
         Status(StreamInput streamInput) throws IOException {
-            this(streamInput.readVLong(), streamInput.readVLong(), streamInput.readVInt(), streamInput.readVInt(), streamInput.readVInt());
+            this(
+                streamInput.readVLong(),
+                streamInput.readVLong(),
+                streamInput.readVInt(),
+                streamInput.readVLong(),
+                streamInput.readVLong()
+            );
         }
 
         @Override
@@ -186,8 +192,8 @@ public class SampleOperator implements Operator {
             out.writeVLong(collectNanos);
             out.writeVLong(emitNanos);
             out.writeVInt(pagesProcessed);
-            out.writeVInt(rowsReceived);
-            out.writeVInt(rowsEmitted);
+            out.writeVLong(rowsReceived);
+            out.writeVLong(rowsEmitted);
         }
 
         @Override
@@ -236,7 +242,13 @@ public class SampleOperator implements Operator {
 
         @Override
         public TransportVersion getMinimalSupportedVersion() {
-            return TransportVersions.ZERO;
+            assert false : "must not be called when overriding supportsVersion";
+            throw new UnsupportedOperationException("must not be called when overriding supportsVersion");
+        }
+
+        @Override
+        public boolean supportsVersion(TransportVersion version) {
+            return version.onOrAfter(TransportVersions.ESQL_SAMPLE_OPERATOR_STATUS_9_1);
         }
     }
 }

+ 72 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SampleOperatorStatusTests.java

@@ -0,0 +1,72 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class SampleOperatorStatusTests extends AbstractWireSerializingTestCase<SampleOperator.Status> {
+    public static SampleOperator.Status simple() {
+        return new SampleOperator.Status(500012, 200012, 123, 111, 222);
+    }
+
+    public static String simpleToJson() {
+        return """
+            {
+              "collect_nanos" : 500012,
+              "collect_time" : "500micros",
+              "emit_nanos" : 200012,
+              "emit_time" : "200micros",
+              "pages_processed" : 123,
+              "rows_received" : 111,
+              "rows_emitted" : 222
+            }""";
+    }
+
+    public void testToXContent() {
+        assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
+    }
+
+    @Override
+    protected Writeable.Reader<SampleOperator.Status> instanceReader() {
+        return SampleOperator.Status::new;
+    }
+
+    @Override
+    public SampleOperator.Status createTestInstance() {
+        return new SampleOperator.Status(
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeInt(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong()
+        );
+    }
+
+    @Override
+    protected SampleOperator.Status mutateInstance(SampleOperator.Status instance) {
+        long collectNanos = instance.collectNanos();
+        long emitNanos = instance.emitNanos();
+        int pagesProcessed = instance.pagesProcessed();
+        long rowsReceived = instance.rowsReceived();
+        long rowsEmitted = instance.rowsEmitted();
+        switch (between(0, 4)) {
+            case 0 -> collectNanos = randomValueOtherThan(collectNanos, ESTestCase::randomNonNegativeLong);
+            case 1 -> emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong);
+            case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
+            case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
+            case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
+            default -> throw new UnsupportedOperationException();
+        }
+        return new SampleOperator.Status(collectNanos, emitNanos, pagesProcessed, rowsReceived, rowsEmitted);
+    }
+}

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -33,6 +33,7 @@ import org.elasticsearch.compute.operator.DriverStatus;
 import org.elasticsearch.compute.operator.HashAggregationOperator;
 import org.elasticsearch.compute.operator.LimitOperator;
 import org.elasticsearch.compute.operator.MvExpandOperator;
+import org.elasticsearch.compute.operator.SampleOperator;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
 import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator;
@@ -328,6 +329,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin
         entries.add(AsyncOperator.Status.ENTRY);
         entries.add(EnrichLookupOperator.Status.ENTRY);
         entries.add(LookupFromIndexOperator.Status.ENTRY);
+        entries.add(SampleOperator.Status.ENTRY);
         entries.add(ExpressionQueryBuilder.ENTRY);
         entries.add(PlanStreamWrapperQueryBuilder.ENTRY);