Browse Source

Fix empty VALUES with ordinals grouping (#130861)

We should not build the sorted structure for the ordinal grouping 
operator if the requested position is larger than maxGroupId. This
situation occurs with nulls. We should benchmark the ordinal blocks and
consider removing the ordinal grouping operator if performance is
similar; otherwise, we need to integrate this operator with
GroupingAggregatorFunctionTestCase.

Relates #130576
Nhat Nguyen 3 months ago
parent
commit
f58d2916e2

+ 1 - 1
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java

@@ -90,10 +90,10 @@ class ValuesBytesRefAggregator {
     }
 
     public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) {
-        var sorted = state.sortedForOrdinalMerging(current);
         if (statePosition > state.maxGroupId) {
             return;
         }
+        var sorted = state.sortedForOrdinalMerging(current);
         var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0;
         var end = sorted.counts[statePosition];
         for (int i = start; i < end; i++) {

+ 1 - 1
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java

@@ -67,10 +67,10 @@ class ValuesDoubleAggregator {
     }
 
     public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) {
-        var sorted = state.sortedForOrdinalMerging(current);
         if (statePosition > state.maxGroupId) {
             return;
         }
+        var sorted = state.sortedForOrdinalMerging(current);
         var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0;
         var end = sorted.counts[statePosition];
         for (int i = start; i < end; i++) {

+ 1 - 1
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java

@@ -66,10 +66,10 @@ class ValuesFloatAggregator {
     }
 
     public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) {
-        var sorted = state.sortedForOrdinalMerging(current);
         if (statePosition > state.maxGroupId) {
             return;
         }
+        var sorted = state.sortedForOrdinalMerging(current);
         var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0;
         var end = sorted.counts[statePosition];
         for (int i = start; i < end; i++) {

+ 1 - 1
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java

@@ -66,10 +66,10 @@ class ValuesIntAggregator {
     }
 
     public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) {
-        var sorted = state.sortedForOrdinalMerging(current);
         if (statePosition > state.maxGroupId) {
             return;
         }
+        var sorted = state.sortedForOrdinalMerging(current);
         var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0;
         var end = sorted.counts[statePosition];
         for (int i = start; i < end; i++) {

+ 1 - 1
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java

@@ -67,10 +67,10 @@ class ValuesLongAggregator {
     }
 
     public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) {
-        var sorted = state.sortedForOrdinalMerging(current);
         if (statePosition > state.maxGroupId) {
             return;
         }
+        var sorted = state.sortedForOrdinalMerging(current);
         var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0;
         var end = sorted.counts[statePosition];
         for (int i = start; i < end; i++) {

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st

@@ -129,10 +129,10 @@ $endif$
     }
 
     public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) {
-        var sorted = state.sortedForOrdinalMerging(current);
         if (statePosition > state.maxGroupId) {
             return;
         }
+        var sorted = state.sortedForOrdinalMerging(current);
         var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0;
         var end = sorted.counts[statePosition];
         for (int i = start; i < end; i++) {

+ 108 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

@@ -11,6 +11,7 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.LongField;
 import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.document.SortedNumericDocValuesField;
 import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
@@ -35,6 +36,7 @@ import org.elasticsearch.common.util.MockBigArrays;
 import org.elasticsearch.common.util.MockPageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.compute.aggregation.CountAggregatorFunction;
+import org.elasticsearch.compute.aggregation.ValuesLongAggregatorFunctionSupplier;
 import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
@@ -254,6 +256,112 @@ public class OperatorTests extends MapperServiceTestCase {
         assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
     }
 
+    // TODO: Remove ordinals grouping operator or enable it GroupingAggregatorFunctionTestCase
+    public void testValuesWithOrdinalGrouping() throws Exception {
+        DriverContext driverContext = driverContext();
+        BlockFactory blockFactory = driverContext.blockFactory();
+
+        final int numDocs = between(100, 1000);
+        Map<BytesRef, Set<Long>> expectedValues = new HashMap<>();
+        try (BaseDirectoryWrapper dir = newDirectory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) {
+            String VAL_NAME = "val";
+            String KEY_NAME = "key";
+            for (int i = 0; i < numDocs; i++) {
+                Document doc = new Document();
+                BytesRef key = new BytesRef(Integer.toString(between(1, 100)));
+                SortedSetDocValuesField keyField = new SortedSetDocValuesField(KEY_NAME, key);
+                doc.add(keyField);
+                if (randomBoolean()) {
+                    int numValues = between(0, 2);
+                    for (int v = 0; v < numValues; v++) {
+                        long val = between(1, 1000);
+                        var valuesField = new SortedNumericDocValuesField(VAL_NAME, val);
+                        doc.add(valuesField);
+                        expectedValues.computeIfAbsent(key, k -> new HashSet<>()).add(val);
+                    }
+                }
+                writer.addDocument(doc);
+            }
+            writer.commit();
+            try (DirectoryReader reader = writer.getReader()) {
+                List<Operator> operators = new ArrayList<>();
+                if (randomBoolean()) {
+                    operators.add(new ShuffleDocsOperator(blockFactory));
+                }
+                operators.add(
+                    new ValuesSourceReaderOperator(
+                        blockFactory,
+                        List.of(
+                            new ValuesSourceReaderOperator.FieldInfo(
+                                VAL_NAME,
+                                ElementType.LONG,
+                                unused -> new BlockDocValuesReader.LongsBlockLoader(VAL_NAME)
+                            )
+                        ),
+                        List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
+                            throw new UnsupportedOperationException();
+                        }, 0.2)),
+                        0
+                    )
+                );
+                operators.add(
+                    new OrdinalsGroupingOperator(
+                        shardIdx -> new KeywordFieldMapper.KeywordFieldType(KEY_NAME).blockLoader(mockBlContext()),
+                        List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)),
+                        ElementType.BYTES_REF,
+                        0,
+                        KEY_NAME,
+                        List.of(new ValuesLongAggregatorFunctionSupplier().groupingAggregatorFactory(INITIAL, List.of(1))),
+                        randomPageSize(),
+                        driverContext
+                    )
+                );
+                operators.add(
+                    new HashAggregationOperator(
+                        List.of(new ValuesLongAggregatorFunctionSupplier().groupingAggregatorFactory(FINAL, List.of(1))),
+                        () -> BlockHash.build(
+                            List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF)),
+                            driverContext.blockFactory(),
+                            randomPageSize(),
+                            false
+                        ),
+                        driverContext
+                    )
+                );
+                Map<BytesRef, Set<Long>> actualValues = new HashMap<>();
+                Driver driver = TestDriverFactory.create(
+                    driverContext,
+                    luceneOperatorFactory(
+                        reader,
+                        List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
+                        LuceneOperator.NO_LIMIT
+                    ).get(driverContext),
+                    operators,
+                    new PageConsumerOperator(page -> {
+                        BytesRefBlock keyBlock = page.getBlock(0);
+                        LongBlock valueBlock = page.getBlock(1);
+                        BytesRef spare = new BytesRef();
+                        for (int p = 0; p < page.getPositionCount(); p++) {
+                            var key = keyBlock.getBytesRef(p, spare);
+                            int valueCount = valueBlock.getValueCount(p);
+                            for (int i = 0; i < valueCount; i++) {
+                                long val = valueBlock.getLong(valueBlock.getFirstValueIndex(p) + i);
+                                boolean added = actualValues.computeIfAbsent(BytesRef.deepCopyOf(key), k -> new HashSet<>()).add(val);
+                                assertTrue(actualValues.toString(), added);
+                            }
+                        }
+                        page.releaseBlocks();
+                    })
+                );
+                OperatorTestCase.runDriver(driver);
+                assertDriverContext(driverContext);
+                assertThat(actualValues, equalTo(expectedValues));
+                org.elasticsearch.common.util.MockBigArrays.ensureAllArraysAreReleased();
+            }
+        }
+        assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
+    }
+
     public void testPushRoundToToQuery() throws IOException {
         long firstGroupMax = randomLong();
         long secondGroupMax = randomLong();