Browse Source

Clean up ValuesSourceReaderOprator a little (ESQL-1112)

This makes the `toString` the same as the builder description and moves
a test from `OperatorTests` into the tests for the
`ValuesSourceReaderOperator` itself.
Nik Everett 2 years ago
parent
commit
037178bab9

+ 1 - 1
benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

@@ -176,7 +176,7 @@ public class ValuesSourceReaderBenchmark {
     @Benchmark
     @OperationsPerInvocation(INDEX_SIZE)
     public void benchmark() {
-        ValuesSourceReaderOperator op = new ValuesSourceReaderOperator(List.of(info(reader, name)), 0);
+        ValuesSourceReaderOperator op = new ValuesSourceReaderOperator(List.of(info(reader, name)), 0, name);
         long sum = 0;
         for (Page page : pages) {
             op.addInput(page);

+ 21 - 19
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

@@ -36,33 +36,18 @@ import java.util.TreeMap;
  */
 @Experimental
 public class ValuesSourceReaderOperator implements Operator {
-
-    private final List<ValueSourceInfo> sources;
-    private final int docChannel;
-
-    private BlockDocValuesReader lastReader;
-    private int lastShard = -1;
-    private int lastSegment = -1;
-
-    private Page lastPage;
-
-    private final Map<String, Integer> readersBuilt = new TreeMap<>();
-    private int pagesProcessed;
-
-    boolean finished;
-
     /**
      * Creates a new extractor that uses ValuesSources load data
      * @param sources the value source, type and index readers to use for extraction
      * @param docChannel the channel containing the shard, leaf/segment and doc id
-     * @param field the lucene field to use
+     * @param field the lucene field being loaded
      */
     public record ValuesSourceReaderOperatorFactory(List<ValueSourceInfo> sources, int docChannel, String field)
         implements
             OperatorFactory {
         @Override
         public Operator get() {
-            return new ValuesSourceReaderOperator(sources, docChannel);
+            return new ValuesSourceReaderOperator(sources, docChannel, field);
         }
 
         @Override
@@ -71,14 +56,31 @@ public class ValuesSourceReaderOperator implements Operator {
         }
     }
 
+    private final List<ValueSourceInfo> sources;
+    private final int docChannel;
+    private final String field;
+
+    private BlockDocValuesReader lastReader;
+    private int lastShard = -1;
+    private int lastSegment = -1;
+
+    private Page lastPage;
+
+    private final Map<String, Integer> readersBuilt = new TreeMap<>();
+    private int pagesProcessed;
+
+    boolean finished;
+
     /**
      * Creates a new extractor
      * @param sources the value source, type and index readers to use for extraction
      * @param docChannel the channel containing the shard, leaf/segment and doc id
+     * @param field the lucene field being loaded
      */
-    public ValuesSourceReaderOperator(List<ValueSourceInfo> sources, int docChannel) {
+    public ValuesSourceReaderOperator(List<ValueSourceInfo> sources, int docChannel, String field) {
         this.sources = sources;
         this.docChannel = docChannel;
+        this.field = field;
     }
 
     @Override
@@ -166,7 +168,7 @@ public class ValuesSourceReaderOperator implements Operator {
 
     @Override
     public String toString() {
-        return "ValuesSourceReaderOperator";
+        return "ValuesSourceReaderOperator[field = " + field + "]";
     }
 
     @Override

+ 8 - 3
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java

@@ -52,13 +52,14 @@ public class OrdinalsGroupingOperator implements Operator {
     public record OrdinalsGroupingOperatorFactory(
         List<ValueSourceInfo> sources,
         int docChannel,
+        String groupingField,
         List<GroupingAggregatorFactory> aggregators,
         BigArrays bigArrays
     ) implements OperatorFactory {
 
         @Override
         public Operator get() {
-            return new OrdinalsGroupingOperator(sources, docChannel, aggregators, bigArrays);
+            return new OrdinalsGroupingOperator(sources, docChannel, groupingField, aggregators, bigArrays);
         }
 
         @Override
@@ -69,6 +70,7 @@ public class OrdinalsGroupingOperator implements Operator {
 
     private final List<ValueSourceInfo> sources;
     private final int docChannel;
+    private final String groupingField;
 
     private final List<GroupingAggregatorFactory> aggregatorFactories;
     private final Map<SegmentID, OrdinalSegmentAggregator> ordinalAggregators;
@@ -82,6 +84,7 @@ public class OrdinalsGroupingOperator implements Operator {
     public OrdinalsGroupingOperator(
         List<ValueSourceInfo> sources,
         int docChannel,
+        String groupingField,
         List<GroupingAggregatorFactory> aggregatorFactories,
         BigArrays bigArrays
     ) {
@@ -94,6 +97,7 @@ public class OrdinalsGroupingOperator implements Operator {
         }
         this.sources = sources;
         this.docChannel = docChannel;
+        this.groupingField = groupingField;
         this.aggregatorFactories = aggregatorFactories;
         this.ordinalAggregators = new HashMap<>();
         this.bigArrays = bigArrays;
@@ -145,7 +149,7 @@ public class OrdinalsGroupingOperator implements Operator {
         } else {
             if (valuesAggregator == null) {
                 int channelIndex = page.getBlockCount(); // extractor will append a new block at the end
-                valuesAggregator = new ValuesAggregator(sources, docChannel, channelIndex, aggregatorFactories, bigArrays);
+                valuesAggregator = new ValuesAggregator(sources, docChannel, groupingField, channelIndex, aggregatorFactories, bigArrays);
             }
             valuesAggregator.addInput(page);
         }
@@ -367,11 +371,12 @@ public class OrdinalsGroupingOperator implements Operator {
         ValuesAggregator(
             List<ValueSourceInfo> sources,
             int docChannel,
+            String groupingField,
             int channelIndex,
             List<GroupingAggregatorFactory> aggregatorFactories,
             BigArrays bigArrays
         ) {
-            this.extractor = new ValuesSourceReaderOperator(sources, docChannel);
+            this.extractor = new ValuesSourceReaderOperator(sources, docChannel, groupingField);
             this.aggregator = new HashAggregationOperator(
                 aggregatorFactories,
                 () -> BlockHash.build(List.of(new HashAggregationOperator.GroupSpec(channelIndex, sources.get(0).elementType())), bigArrays)

+ 7 - 108
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

@@ -8,11 +8,9 @@
 package org.elasticsearch.compute;
 
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.DoubleDocValuesField;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.NumericDocValuesField;
-import org.apache.lucene.document.SortedDocValuesField;
 import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
@@ -45,7 +43,6 @@ import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BytesRefBlock;
 import org.elasticsearch.compute.data.DocBlock;
 import org.elasticsearch.compute.data.DocVector;
-import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntArrayVector;
 import org.elasticsearch.compute.data.IntBlock;
@@ -68,12 +65,9 @@ import org.elasticsearch.compute.operator.SequenceLongBlockSourceOperator;
 import org.elasticsearch.compute.operator.TopNOperator;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Releasables;
-import org.elasticsearch.index.fielddata.IndexFieldDataCache;
 import org.elasticsearch.index.fielddata.IndexNumericFieldData;
 import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
-import org.elasticsearch.index.fielddata.plain.SortedDoublesIndexFieldData;
 import org.elasticsearch.index.fielddata.plain.SortedNumericIndexFieldData;
-import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData;
 import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@@ -172,7 +166,8 @@ public class OperatorTests extends ESTestCase {
                         List.of(
                             new ValuesSourceReaderOperator(
                                 List.of(new ValueSourceInfo(CoreValuesSourceType.NUMERIC, vs, ElementType.LONG, reader)),
-                                0
+                                0,
+                                fieldName
                             ),
                             new TopNOperator(limit, List.of(new TopNOperator.SortOrder(1, true, true)))
                         ),
@@ -222,7 +217,8 @@ public class OperatorTests extends ESTestCase {
                                 List.of(
                                     new ValuesSourceReaderOperator(
                                         List.of(new ValueSourceInfo(CoreValuesSourceType.NUMERIC, vs, ElementType.LONG, reader)),
-                                        0
+                                        0,
+                                        fieldName
                                     )
                                 ),
                                 new PageConsumerOperator(page -> rowCount.addAndGet(page.getPositionCount())),
@@ -258,105 +254,6 @@ public class OperatorTests extends ESTestCase {
         return w;
     }
 
-    public void testValuesSourceReaderOperatorWithNulls() throws IOException {  // TODO move to ValuesSourceReaderOperatorTests
-        final int numDocs = 100_000;
-        try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {
-            Document doc = new Document();
-            NumericDocValuesField intField = new NumericDocValuesField("i", 0);
-            NumericDocValuesField longField = new NumericDocValuesField("j", 0);
-            NumericDocValuesField doubleField = new DoubleDocValuesField("d", 0);
-            String kwFieldName = "kw";
-            for (int i = 0; i < numDocs; i++) {
-                doc.clear();
-                intField.setLongValue(i);
-                doc.add(intField);
-                if (i % 100 != 0) { // Do not set field for every 100 values
-                    longField.setLongValue(i);
-                    doc.add(longField);
-                    doubleField.setDoubleValue(i);
-                    doc.add(doubleField);
-                    doc.add(new SortedDocValuesField(kwFieldName, new BytesRef("kw=" + i)));
-                }
-                w.addDocument(doc);
-            }
-            w.commit();
-
-            ValuesSource intVs = new ValuesSource.Numeric.FieldData(
-                new SortedNumericIndexFieldData(
-                    intField.name(),
-                    IndexNumericFieldData.NumericType.INT,
-                    IndexNumericFieldData.NumericType.INT.getValuesSourceType(),
-                    null
-                )
-            );
-            ValuesSource longVs = new ValuesSource.Numeric.FieldData(
-                new SortedNumericIndexFieldData(
-                    longField.name(),
-                    IndexNumericFieldData.NumericType.LONG,
-                    IndexNumericFieldData.NumericType.LONG.getValuesSourceType(),
-                    null
-                )
-            );
-            ValuesSource doubleVs = new ValuesSource.Numeric.FieldData(
-                new SortedDoublesIndexFieldData(
-                    doubleField.name(),
-                    IndexNumericFieldData.NumericType.DOUBLE,
-                    IndexNumericFieldData.NumericType.DOUBLE.getValuesSourceType(),
-                    null
-                )
-            );
-            var breakerService = new NoneCircuitBreakerService();
-            var cache = new IndexFieldDataCache.None();
-            ValuesSource keywordVs = new ValuesSource.Bytes.FieldData(
-                new SortedSetOrdinalsIndexFieldData(cache, kwFieldName, CoreValuesSourceType.KEYWORD, breakerService, null)
-            );
-
-            try (IndexReader reader = w.getReader()) {
-                // implements cardinality on value field
-                Driver driver = new Driver(
-                    new LuceneSourceOperator(reader, 0, new MatchAllDocsQuery()),
-                    List.of(
-                        new ValuesSourceReaderOperator(
-                            List.of(new ValueSourceInfo(CoreValuesSourceType.NUMERIC, intVs, ElementType.INT, reader)),
-                            0
-                        ),
-                        new ValuesSourceReaderOperator(
-                            List.of(new ValueSourceInfo(CoreValuesSourceType.NUMERIC, longVs, ElementType.LONG, reader)),
-                            0
-                        ),
-                        new ValuesSourceReaderOperator(
-                            List.of(new ValueSourceInfo(CoreValuesSourceType.NUMERIC, doubleVs, ElementType.DOUBLE, reader)),
-                            0
-                        ),
-                        new ValuesSourceReaderOperator(
-                            List.of(new ValueSourceInfo(CoreValuesSourceType.KEYWORD, keywordVs, ElementType.BYTES_REF, reader)),
-                            0
-                        )
-                    ),
-                    new PageConsumerOperator(page -> {
-                        logger.debug("New page: {}", page);
-                        IntBlock intValuesBlock = page.getBlock(1);
-                        LongBlock longValuesBlock = page.getBlock(2);
-                        DoubleBlock doubleValuesBlock = page.getBlock(3);
-                        BytesRefBlock keywordValuesBlock = page.getBlock(4);
-
-                        for (int i = 0; i < page.getPositionCount(); i++) {
-                            assertFalse(intValuesBlock.isNull(i));
-                            long j = intValuesBlock.getInt(i);
-                            // Every 100 documents we set fields to null
-                            boolean fieldIsEmpty = j % 100 == 0;
-                            assertEquals(fieldIsEmpty, longValuesBlock.isNull(i));
-                            assertEquals(fieldIsEmpty, doubleValuesBlock.isNull(i));
-                            assertEquals(fieldIsEmpty, keywordValuesBlock.isNull(i));
-                        }
-                    }),
-                    () -> {}
-                );
-                driver.run();
-            }
-        }
-    }
-
     public void testQueryOperator() throws IOException {
         Map<BytesRef, Long> docs = new HashMap<>();
         CheckedConsumer<DirectoryReader, IOException> verifier = reader -> {
@@ -461,7 +358,8 @@ public class OperatorTests extends ESTestCase {
                         List.of(
                             new ValuesSourceReaderOperator(
                                 List.of(new ValueSourceInfo(CoreValuesSourceType.NUMERIC, vs, ElementType.LONG, reader)),
-                                0
+                                0,
+                                fieldName
                             ),
                             new HashAggregationOperator(
                                 List.of(
@@ -603,6 +501,7 @@ public class OperatorTests extends ESTestCase {
                                 )
                             ),
                             0,
+                            gField,
                             List.of(
                                 new GroupingAggregator.GroupingAggregatorFactory(bigArrays, GroupingAggregatorFunction.COUNT, INITIAL, 1)
                             ),

+ 64 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java

@@ -7,6 +7,10 @@
 
 package org.elasticsearch.compute.lucene;
 
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.DoubleDocValuesField;
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.document.SortedDocValuesField;
 import org.apache.lucene.document.SortedNumericDocValuesField;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexableField;
@@ -146,7 +150,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
 
     @Override
     protected String expectedToStringOfSimple() {
-        return "ValuesSourceReaderOperator";
+        return expectedDescriptionOfSimple();
     }
 
     @Override
@@ -301,4 +305,63 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             }
         }
     }
+
+    public void testValuesSourceReaderOperatorWithNulls() throws IOException {
+        MappedFieldType intFt = new NumberFieldMapper.NumberFieldType("i", NumberFieldMapper.NumberType.INTEGER);
+        MappedFieldType longFt = new NumberFieldMapper.NumberFieldType("j", NumberFieldMapper.NumberType.LONG);
+        MappedFieldType doubleFt = new NumberFieldMapper.NumberFieldType("d", NumberFieldMapper.NumberType.DOUBLE);
+        MappedFieldType kwFt = new KeywordFieldMapper.KeywordFieldType("kw");
+
+        NumericDocValuesField intField = new NumericDocValuesField(intFt.name(), 0);
+        NumericDocValuesField longField = new NumericDocValuesField(longFt.name(), 0);
+        NumericDocValuesField doubleField = new DoubleDocValuesField(doubleFt.name(), 0);
+        final int numDocs = 100_000;
+        try (RandomIndexWriter w = new RandomIndexWriter(random(), directory)) {
+            Document doc = new Document();
+            for (int i = 0; i < numDocs; i++) {
+                doc.clear();
+                intField.setLongValue(i);
+                doc.add(intField);
+                if (i % 100 != 0) { // Do not set field for every 100 values
+                    longField.setLongValue(i);
+                    doc.add(longField);
+                    doubleField.setDoubleValue(i);
+                    doc.add(doubleField);
+                    doc.add(new SortedDocValuesField(kwFt.name(), new BytesRef("kw=" + i)));
+                }
+                w.addDocument(doc);
+            }
+            w.commit();
+            reader = w.getReader();
+        }
+
+        Driver driver = new Driver(
+            new LuceneSourceOperator(reader, 0, new MatchAllDocsQuery()),
+            List.of(
+                factory(CoreValuesSourceType.NUMERIC, ElementType.INT, intFt).get(),
+                factory(CoreValuesSourceType.NUMERIC, ElementType.LONG, longFt).get(),
+                factory(CoreValuesSourceType.NUMERIC, ElementType.DOUBLE, doubleFt).get(),
+                factory(CoreValuesSourceType.KEYWORD, ElementType.BYTES_REF, kwFt).get()
+            ),
+            new PageConsumerOperator(page -> {
+                logger.debug("New page: {}", page);
+                IntBlock intValuesBlock = page.getBlock(1);
+                LongBlock longValuesBlock = page.getBlock(2);
+                DoubleBlock doubleValuesBlock = page.getBlock(3);
+                BytesRefBlock keywordValuesBlock = page.getBlock(4);
+
+                for (int i = 0; i < page.getPositionCount(); i++) {
+                    assertFalse(intValuesBlock.isNull(i));
+                    long j = intValuesBlock.getInt(i);
+                    // Every 100 documents we set fields to null
+                    boolean fieldIsEmpty = j % 100 == 0;
+                    assertEquals(fieldIsEmpty, longValuesBlock.isNull(i));
+                    assertEquals(fieldIsEmpty, doubleValuesBlock.isNull(i));
+                    assertEquals(fieldIsEmpty, keywordValuesBlock.isNull(i));
+                }
+            }),
+            () -> {}
+        );
+        driver.run();
+    }
 }

+ 1 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

@@ -122,7 +122,7 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
                     luceneSources++;
                     continue;
                 }
-                if (o.operator().equals("ValuesSourceReaderOperator")) {
+                if (o.operator().equals("ValuesSourceReaderOperator[field = pause_me]")) {
                     ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status();
                     assertThat(oStatus.readersBuilt(), equalTo(Map.of("LongValuesReader", 1)));
                     assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1));

+ 1 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -156,6 +156,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
                 LocalExecutionPlanner.toElementType(attrSource.dataType())
             ),
             docChannel,
+            attrSource.name(),
             aggregatorFactories,
             BigArrays.NON_RECYCLING_INSTANCE
         );