Browse Source

Move GroupSpec to BlockHash (#107665)

This change moves GroupSpec from HashAggregationOperator to BlockHash, 
making it available for MetricsAggregatorOperator, which will be
introduced soon.
Nhat Nguyen 1 year ago
parent
commit
8441a0514c
13 changed files with 39 additions and 48 deletions
  1. 12 15
      benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java
  2. 3 7
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java
  3. 3 4
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java
  4. 5 6
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java
  5. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java
  6. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java
  7. 2 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java
  8. 2 3
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashRandomizedTests.java
  9. 2 3
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java
  10. 2 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java
  11. 2 2
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java
  12. 3 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java
  13. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java

+ 12 - 15
benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java

@@ -124,24 +124,21 @@ public class AggregatorBenchmark {
                 driverContext
             );
         }
-        List<HashAggregationOperator.GroupSpec> groups = switch (grouping) {
-            case LONGS -> List.of(new HashAggregationOperator.GroupSpec(0, ElementType.LONG));
-            case INTS -> List.of(new HashAggregationOperator.GroupSpec(0, ElementType.INT));
-            case DOUBLES -> List.of(new HashAggregationOperator.GroupSpec(0, ElementType.DOUBLE));
-            case BOOLEANS -> List.of(new HashAggregationOperator.GroupSpec(0, ElementType.BOOLEAN));
-            case BYTES_REFS -> List.of(new HashAggregationOperator.GroupSpec(0, ElementType.BYTES_REF));
-            case TWO_LONGS -> List.of(
-                new HashAggregationOperator.GroupSpec(0, ElementType.LONG),
-                new HashAggregationOperator.GroupSpec(1, ElementType.LONG)
-            );
+        List<BlockHash.GroupSpec> groups = switch (grouping) {
+            case LONGS -> List.of(new BlockHash.GroupSpec(0, ElementType.LONG));
+            case INTS -> List.of(new BlockHash.GroupSpec(0, ElementType.INT));
+            case DOUBLES -> List.of(new BlockHash.GroupSpec(0, ElementType.DOUBLE));
+            case BOOLEANS -> List.of(new BlockHash.GroupSpec(0, ElementType.BOOLEAN));
+            case BYTES_REFS -> List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF));
+            case TWO_LONGS -> List.of(new BlockHash.GroupSpec(0, ElementType.LONG), new BlockHash.GroupSpec(1, ElementType.LONG));
             case LONGS_AND_BYTES_REFS -> List.of(
-                new HashAggregationOperator.GroupSpec(0, ElementType.LONG),
-                new HashAggregationOperator.GroupSpec(1, ElementType.BYTES_REF)
+                new BlockHash.GroupSpec(0, ElementType.LONG),
+                new BlockHash.GroupSpec(1, ElementType.BYTES_REF)
             );
             case TWO_LONGS_AND_BYTES_REFS -> List.of(
-                new HashAggregationOperator.GroupSpec(0, ElementType.LONG),
-                new HashAggregationOperator.GroupSpec(1, ElementType.LONG),
-                new HashAggregationOperator.GroupSpec(2, ElementType.BYTES_REF)
+                new BlockHash.GroupSpec(0, ElementType.LONG),
+                new BlockHash.GroupSpec(1, ElementType.LONG),
+                new BlockHash.GroupSpec(2, ElementType.BYTES_REF)
             );
             default -> throw new IllegalArgumentException("unsupported grouping [" + grouping + "]");
         };

+ 3 - 7
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java

@@ -19,7 +19,6 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.operator.HashAggregationOperator;
 import org.elasticsearch.core.Releasable;
 
 import java.util.List;
@@ -65,6 +64,8 @@ public abstract sealed class BlockHash implements Releasable, SeenGroupIds //
     @Override
     public abstract BitArray seenGroupIds(BigArrays bigArrays);
 
+    public record GroupSpec(int channel, ElementType elementType) {}
+
     /**
      * Creates a specialized hash table that maps one or more {@link Block}s to ids.
      * @param emitBatchSize maximum batch size to be emitted when handling combinatorial
@@ -74,12 +75,7 @@ public abstract sealed class BlockHash implements Releasable, SeenGroupIds //
      *                                 production until we can. And this lets us continue to compile and
      *                                 test them.
      */
-    public static BlockHash build(
-        List<HashAggregationOperator.GroupSpec> groups,
-        BlockFactory blockFactory,
-        int emitBatchSize,
-        boolean allowBrokenOptimizations
-    ) {
+    public static BlockHash build(List<GroupSpec> groups, BlockFactory blockFactory, int emitBatchSize, boolean allowBrokenOptimizations) {
         if (groups.size() == 1) {
             return newForElementType(groups.get(0).channel(), groups.get(0).elementType(), blockFactory);
         }

+ 3 - 4
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java

@@ -21,7 +21,6 @@ import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.BatchEncoder;
-import org.elasticsearch.compute.operator.HashAggregationOperator;
 import org.elasticsearch.compute.operator.MultivalueDedupe;
 import org.elasticsearch.core.Releasables;
 
@@ -59,7 +58,7 @@ final class PackedValuesBlockHash extends BlockHash {
     private final BytesRefBuilder bytes = new BytesRefBuilder();
     private final Group[] groups;
 
-    PackedValuesBlockHash(List<HashAggregationOperator.GroupSpec> specs, BlockFactory blockFactory, int emitBatchSize) {
+    PackedValuesBlockHash(List<GroupSpec> specs, BlockFactory blockFactory, int emitBatchSize) {
         super(blockFactory);
         this.groups = specs.stream().map(Group::new).toArray(Group[]::new);
         this.emitBatchSize = emitBatchSize;
@@ -79,7 +78,7 @@ final class PackedValuesBlockHash extends BlockHash {
     }
 
     private static class Group {
-        final HashAggregationOperator.GroupSpec spec;
+        final GroupSpec spec;
         BatchEncoder encoder;
         int positionOffset;
         int valueOffset;
@@ -87,7 +86,7 @@ final class PackedValuesBlockHash extends BlockHash {
         int valueCount;
         int bytesStart;
 
-        Group(HashAggregationOperator.GroupSpec spec) {
+        Group(GroupSpec spec) {
             this.spec = spec;
         }
     }

+ 5 - 6
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java

@@ -18,7 +18,6 @@ import org.elasticsearch.compute.aggregation.GroupingAggregator;
 import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
 import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
@@ -38,11 +37,11 @@ import static java.util.stream.Collectors.joining;
 
 public class HashAggregationOperator implements Operator {
 
-    public record GroupSpec(int channel, ElementType elementType) {}
-
-    public record HashAggregationOperatorFactory(List<GroupSpec> groups, List<GroupingAggregator.Factory> aggregators, int maxPageSize)
-        implements
-            OperatorFactory {
+    public record HashAggregationOperatorFactory(
+        List<BlockHash.GroupSpec> groups,
+        List<GroupingAggregator.Factory> aggregators,
+        int maxPageSize
+    ) implements OperatorFactory {
         @Override
         public Operator get(DriverContext driverContext) {
             return new HashAggregationOperator(

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java

@@ -20,6 +20,7 @@ import org.elasticsearch.compute.aggregation.GroupingAggregator.Factory;
 import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
 import org.elasticsearch.compute.aggregation.SeenGroupIds;
 import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash.GroupSpec;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.DocBlock;
@@ -29,7 +30,6 @@ import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
-import org.elasticsearch.compute.operator.HashAggregationOperator.GroupSpec;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.index.mapper.BlockLoader;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

@@ -239,7 +239,7 @@ public class OperatorTests extends MapperServiceTestCase {
                         new HashAggregationOperator(
                             List.of(CountAggregatorFunction.supplier(List.of(1, 2)).groupingAggregatorFactory(FINAL)),
                             () -> BlockHash.build(
-                                List.of(new HashAggregationOperator.GroupSpec(0, ElementType.BYTES_REF)),
+                                List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF)),
                                 driverContext.blockFactory(),
                                 randomPageSize(),
                                 false

+ 2 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java

@@ -9,6 +9,7 @@ package org.elasticsearch.compute.aggregation;
 
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.util.BitArray;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockTestUtils;
@@ -69,7 +70,7 @@ public abstract class GroupingAggregatorFunctionTestCase extends ForkingOperator
             supplier = chunkGroups(emitChunkSize, supplier);
         }
         return new HashAggregationOperator.HashAggregationOperatorFactory(
-            List.of(new HashAggregationOperator.GroupSpec(0, ElementType.LONG)),
+            List.of(new BlockHash.GroupSpec(0, ElementType.LONG)),
             List.of(supplier.groupingAggregatorFactory(mode)),
             randomPageSize()
         );

+ 2 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashRandomizedTests.java

@@ -21,7 +21,6 @@ import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.MockBlockFactory;
-import org.elasticsearch.compute.operator.HashAggregationOperator;
 import org.elasticsearch.compute.operator.MultivalueDedupeTests;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.indices.CrankyCircuitBreakerService;
@@ -197,9 +196,9 @@ public class BlockHashRandomizedTests extends ESTestCase {
     }
 
     private BlockHash newBlockHash(BlockFactory blockFactory, int emitBatchSize, List<ElementType> types) {
-        List<HashAggregationOperator.GroupSpec> specs = new ArrayList<>(types.size());
+        List<BlockHash.GroupSpec> specs = new ArrayList<>(types.size());
         for (int c = 0; c < types.size(); c++) {
-            specs.add(new HashAggregationOperator.GroupSpec(c, types.get(c)));
+            specs.add(new BlockHash.GroupSpec(c, types.get(c)));
         }
         return forcePackedHash
             ? new PackedValuesBlockHash(specs, blockFactory, emitBatchSize)

+ 2 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java

@@ -27,7 +27,6 @@ import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.MockBlockFactory;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.data.TestBlockFactory;
-import org.elasticsearch.compute.operator.HashAggregationOperator;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.test.ESTestCase;
@@ -1151,9 +1150,9 @@ public class BlockHashTests extends ESTestCase {
 
     private void hash(Consumer<OrdsAndKeys> callback, int emitBatchSize, Block... values) {
         try {
-            List<HashAggregationOperator.GroupSpec> specs = new ArrayList<>(values.length);
+            List<BlockHash.GroupSpec> specs = new ArrayList<>(values.length);
             for (int c = 0; c < values.length; c++) {
-                specs.add(new HashAggregationOperator.GroupSpec(c, values[c].elementType()));
+                specs.add(new BlockHash.GroupSpec(c, values[c].elementType()));
             }
             try (
                 BlockHash blockHash = forcePackedHash

+ 2 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.compute.aggregation.MaxLongGroupingAggregatorFunctionTe
 import org.elasticsearch.compute.aggregation.SumLongAggregatorFunction;
 import org.elasticsearch.compute.aggregation.SumLongAggregatorFunctionSupplier;
 import org.elasticsearch.compute.aggregation.SumLongGroupingAggregatorFunctionTests;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.ElementType;
@@ -51,7 +52,7 @@ public class HashAggregationOperatorTests extends ForkingOperatorTestCase {
         }
 
         return new HashAggregationOperator.HashAggregationOperatorFactory(
-            List.of(new HashAggregationOperator.GroupSpec(0, ElementType.LONG)),
+            List.of(new BlockHash.GroupSpec(0, ElementType.LONG)),
             List.of(
                 new SumLongAggregatorFunctionSupplier(sumChannels).groupingAggregatorFactory(mode),
                 new MaxLongAggregatorFunctionSupplier(maxChannels).groupingAggregatorFactory(mode)

+ 2 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java

@@ -323,7 +323,7 @@ public class TimeSeriesAggregationOperatorTests extends AnyOperatorTestCase {
         HashAggregationOperator finalHash = new HashAggregationOperator(
             List.of(new RateLongAggregatorFunctionSupplier(List.of(1, 2, 3), unitInMillis).groupingAggregatorFactory(AggregatorMode.FINAL)),
             () -> BlockHash.build(
-                List.of(new HashAggregationOperator.GroupSpec(0, ElementType.BYTES_REF)),
+                List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF)),
                 ctx.blockFactory(),
                 randomIntBetween(1, 1000),
                 randomBoolean()
@@ -339,7 +339,7 @@ public class TimeSeriesAggregationOperatorTests extends AnyOperatorTestCase {
                     new RateLongAggregatorFunctionSupplier(List.of(5, 2), unitInMillis).groupingAggregatorFactory(AggregatorMode.INITIAL)
                 ),
                 () -> BlockHash.build(
-                    List.of(new HashAggregationOperator.GroupSpec(4, ElementType.BYTES_REF)),
+                    List.of(new BlockHash.GroupSpec(4, ElementType.BYTES_REF)),
                     ctx.blockFactory(),
                     randomIntBetween(1, 1000),
                     randomBoolean()

+ 3 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

@@ -11,9 +11,9 @@ import org.elasticsearch.compute.aggregation.Aggregator;
 import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
 import org.elasticsearch.compute.aggregation.AggregatorMode;
 import org.elasticsearch.compute.aggregation.GroupingAggregator;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.operator.AggregationOperator;
-import org.elasticsearch.compute.operator.HashAggregationOperator;
 import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
@@ -289,11 +289,11 @@ public abstract class AbstractPhysicalOperationProviders implements PhysicalOper
     }
 
     private record GroupSpec(Integer channel, Attribute attribute) {
-        HashAggregationOperator.GroupSpec toHashGroupSpec() {
+        BlockHash.GroupSpec toHashGroupSpec() {
             if (channel == null) {
                 throw new EsqlIllegalArgumentException("planned to use ordinals but tried to use the hash instead");
             }
-            return new HashAggregationOperator.GroupSpec(channel, elementType());
+            return new BlockHash.GroupSpec(channel, elementType());
         }
 
         ElementType elementType() {

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java

@@ -276,7 +276,7 @@ public class TestPhysicalOperationProviders extends AbstractPhysicalOperationPro
             return new TestHashAggregationOperator(
                 aggregators,
                 () -> BlockHash.build(
-                    List.of(new HashAggregationOperator.GroupSpec(groupByChannel, groupElementType)),
+                    List.of(new BlockHash.GroupSpec(groupByChannel, groupElementType)),
                     driverContext.blockFactory(),
                     pageSize,
                     false