Browse Source

Re-enable time-series block hash (#127488) (#127545)

Re-enable #127488 after fixing the memory accounting.
Nhat Nguyen 5 months ago
parent
commit
6edc76d10f

+ 61 - 31
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java

@@ -30,8 +30,6 @@ import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.ReleasableIterator;
 import org.elasticsearch.core.Releasables;
 
-import java.util.Objects;
-
 /**
  * An optimized block hash that receives two blocks: tsid and timestamp, which are sorted.
  * Since the incoming data is sorted, this block hash appends the incoming data to the internal arrays without lookup.
@@ -41,7 +39,7 @@ public final class TimeSeriesBlockHash extends BlockHash {
     private final int tsHashChannel;
     private final int timestampIntervalChannel;
 
-    private final BytesRef lastTsid = new BytesRef();
+    private int lastTsidPosition = 0;
     private final BytesRefArrayWithSize tsidArray;
 
     private long lastTimestamp;
@@ -64,19 +62,70 @@ public final class TimeSeriesBlockHash extends BlockHash {
         Releasables.close(tsidArray, timestampArray, perTsidCountArray);
     }
 
+    private OrdinalBytesRefVector getTsidVector(Page page) {
+        BytesRefBlock block = page.getBlock(tsHashChannel);
+        var ordinalBlock = block.asOrdinals();
+        if (ordinalBlock == null) {
+            throw new IllegalStateException("expected ordinal block for tsid");
+        }
+        var ordinalVector = ordinalBlock.asVector();
+        if (ordinalVector == null) {
+            throw new IllegalStateException("expected ordinal vector for tsid");
+        }
+        return ordinalVector;
+    }
+
+    private LongVector getTimestampVector(Page page) {
+        final LongBlock timestampsBlock = page.getBlock(timestampIntervalChannel);
+        LongVector timestampsVector = timestampsBlock.asVector();
+        if (timestampsVector == null) {
+            throw new IllegalStateException("expected long vector for timestamp");
+        }
+        return timestampsVector;
+    }
+
     @Override
     public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
-        final BytesRefBlock tsidBlock = page.getBlock(tsHashChannel);
-        final BytesRefVector tsidVector = Objects.requireNonNull(tsidBlock.asVector(), "tsid input must be a vector");
-        final LongBlock timestampBlock = page.getBlock(timestampIntervalChannel);
-        final LongVector timestampVector = Objects.requireNonNull(timestampBlock.asVector(), "timestamp input must be a vector");
-        try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidVector.getPositionCount())) {
+        final BytesRefVector tsidDict;
+        final IntVector tsidOrdinals;
+        {
+            final var tsidVector = getTsidVector(page);
+            tsidDict = tsidVector.getDictionaryVector();
+            tsidOrdinals = tsidVector.getOrdinalsVector();
+        }
+        try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidOrdinals.getPositionCount())) {
             final BytesRef spare = new BytesRef();
-            // TODO: optimize incoming ordinal block
-            for (int i = 0; i < tsidVector.getPositionCount(); i++) {
-                final BytesRef tsid = tsidVector.getBytesRef(i, spare);
+            final BytesRef lastTsid = new BytesRef();
+            final LongVector timestampVector = getTimestampVector(page);
+            int lastOrd = -1;
+            for (int i = 0; i < tsidOrdinals.getPositionCount(); i++) {
+                final int newOrd = tsidOrdinals.getInt(i);
+                boolean newGroup = false;
+                if (lastOrd != newOrd) {
+                    final var newTsid = tsidDict.getBytesRef(newOrd, spare);
+                    if (positionCount() == 0) {
+                        newGroup = true;
+                    } else if (lastOrd == -1) {
+                        tsidArray.get(lastTsidPosition, lastTsid);
+                        newGroup = lastTsid.equals(newTsid) == false;
+                    } else {
+                        newGroup = true;
+                    }
+                    if (newGroup) {
+                        endTsidGroup();
+                        lastTsidPosition = tsidArray.count;
+                        tsidArray.append(newTsid);
+                    }
+                    lastOrd = newOrd;
+                }
                 final long timestamp = timestampVector.getLong(i);
-                ordsBuilder.appendInt(addOnePosition(tsid, timestamp));
+                if (newGroup || timestamp != lastTimestamp) {
+                    assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
+                    timestampArray.append(timestamp);
+                    lastTimestamp = timestamp;
+                    currentTimestampCount++;
+                }
+                ordsBuilder.appendInt(timestampArray.count - 1);
             }
             try (var ords = ordsBuilder.build()) {
                 addInput.add(0, ords);
@@ -84,24 +133,6 @@ public final class TimeSeriesBlockHash extends BlockHash {
         }
     }
 
-    private int addOnePosition(BytesRef tsid, long timestamp) {
-        boolean newGroup = false;
-        if (positionCount() == 0 || lastTsid.equals(tsid) == false) {
-            assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
-            endTsidGroup();
-            tsidArray.append(tsid);
-            tsidArray.get(tsidArray.count - 1, lastTsid);
-            newGroup = true;
-        }
-        if (newGroup || timestamp != lastTimestamp) {
-            assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
-            timestampArray.append(timestamp);
-            lastTimestamp = timestamp;
-            currentTimestampCount++;
-        }
-        return positionCount() - 1;
-    }
-
     private void endTsidGroup() {
         if (currentTimestampCount > 0) {
             perTsidCountArray.append(currentTimestampCount);
@@ -270,7 +301,6 @@ public final class TimeSeriesBlockHash extends BlockHash {
 
         BytesRefVector toVector() {
             BytesRefVector vector = blockFactory.newBytesRefArrayVector(array, count);
-            blockFactory.adjustBreaker(vector.ramBytesUsed() - array.bigArraysRamBytesUsed());
             array = null;
             return vector;
         }

+ 18 - 4
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java

@@ -54,11 +54,11 @@ public final class OrdinalBytesRefBlock extends AbstractNonThreadSafeRefCounted
      * Returns true if this ordinal block is dense enough to enable optimizations using its ordinals
      */
     public boolean isDense() {
-        return isDense(bytes.getPositionCount(), ordinals.getTotalValueCount());
+        return isDense(ordinals.getTotalValueCount(), bytes.getPositionCount());
     }
 
-    public static boolean isDense(int totalPositions, int numOrdinals) {
-        return numOrdinals * 2L / 3L >= totalPositions;
+    public static boolean isDense(long totalPositions, long dictionarySize) {
+        return totalPositions >= 10 && totalPositions >= dictionarySize * 2L;
     }
 
     public IntBlock getOrdinalsBlock() {
@@ -75,7 +75,7 @@ public final class OrdinalBytesRefBlock extends AbstractNonThreadSafeRefCounted
     }
 
     @Override
-    public BytesRefVector asVector() {
+    public OrdinalBytesRefVector asVector() {
         IntVector vector = ordinals.asVector();
         if (vector != null) {
             return new OrdinalBytesRefVector(vector, bytes);
@@ -251,6 +251,20 @@ public final class OrdinalBytesRefBlock extends AbstractNonThreadSafeRefCounted
         return ordinals.ramBytesUsed() + bytes.ramBytesUsed();
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof BytesRefBlock b) {
+            return BytesRefBlock.equals(this, b);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return BytesRefBlock.hash(this);
+    }
+
     @Override
     public String toString() {
         return getClass().getSimpleName() + "[ordinals=" + ordinals + ", bytes=" + bytes + "]";

+ 15 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefVector.java

@@ -55,7 +55,7 @@ public final class OrdinalBytesRefVector extends AbstractNonThreadSafeRefCounted
      * Returns true if this ordinal vector is dense enough to enable optimizations using its ordinals
      */
     public boolean isDense() {
-        return ordinals.getPositionCount() * 2 / 3 >= bytes.getPositionCount();
+        return OrdinalBytesRefBlock.isDense(ordinals.getPositionCount(), bytes.getPositionCount());
     }
 
     @Override
@@ -155,4 +155,18 @@ public final class OrdinalBytesRefVector extends AbstractNonThreadSafeRefCounted
     protected void closeInternal() {
         Releasables.close(ordinals, bytes);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof BytesRefVector other) {
+            return BytesRefVector.equals(this, other);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return BytesRefVector.hash(this);
+    }
 }

+ 14 - 11
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java

@@ -14,6 +14,7 @@ import org.elasticsearch.compute.aggregation.GroupingAggregator;
 import org.elasticsearch.compute.aggregation.GroupingAggregatorEvaluationContext;
 import org.elasticsearch.compute.aggregation.TimeSeriesGroupingAggregatorEvaluationContext;
 import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
+import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.LongBlock;
@@ -30,6 +31,7 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator {
 
     public record Factory(
         Rounding.Prepared timeBucket,
+        boolean sortedInput,
         List<BlockHash.GroupSpec> groups,
         AggregatorMode aggregatorMode,
         List<GroupingAggregator.Factory> aggregators,
@@ -38,17 +40,18 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator {
         @Override
         public Operator get(DriverContext driverContext) {
             // TODO: use TimeSeriesBlockHash when possible
-            return new TimeSeriesAggregationOperator(
-                timeBucket,
-                aggregators,
-                () -> BlockHash.build(
-                    groups,
-                    driverContext.blockFactory(),
-                    maxPageSize,
-                    true // we can enable optimizations as the inputs are vectors
-                ),
-                driverContext
-            );
+            return new TimeSeriesAggregationOperator(timeBucket, aggregators, () -> {
+                if (sortedInput && groups.size() == 2) {
+                    return new TimeSeriesBlockHash(groups.get(0).channel(), groups.get(1).channel(), driverContext.blockFactory());
+                } else {
+                    return BlockHash.build(
+                        groups,
+                        driverContext.blockFactory(),
+                        maxPageSize,
+                        true // we can enable optimizations as the inputs are vectors
+                    );
+                }
+            }, driverContext);
         }
 
         @Override

+ 112 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
 import org.elasticsearch.compute.data.OrdinalBytesRefVector;
 import org.elasticsearch.compute.data.Page;
@@ -29,12 +30,14 @@ import org.elasticsearch.compute.test.TestBlockFactory;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.ReleasableIterator;
 import org.elasticsearch.core.Releasables;
+import org.elasticsearch.xpack.esql.core.util.Holder;
 import org.junit.After;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
@@ -1326,6 +1329,115 @@ public class BlockHashTests extends BlockHashTestCase {
         }
     }
 
+    public void testTimeSeriesBlockHash() throws Exception {
+        long endTime = randomLongBetween(10_000_000, 20_000_000);
+        var hash1 = new TimeSeriesBlockHash(0, 1, blockFactory);
+        var hash2 = BlockHash.build(
+            List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF), new BlockHash.GroupSpec(1, ElementType.LONG)),
+            blockFactory,
+            32 * 1024,
+            forcePackedHash
+        );
+        int numPages = between(1, 100);
+        int globalTsid = -1;
+        long timestamp = endTime;
+        try (hash1; hash2) {
+            for (int p = 0; p < numPages; p++) {
+                int numRows = between(1, 1000);
+                if (randomBoolean()) {
+                    timestamp -= between(0, 100);
+                }
+                try (
+                    BytesRefVector.Builder dictBuilder = blockFactory.newBytesRefVectorBuilder(numRows);
+                    IntVector.Builder ordinalBuilder = blockFactory.newIntVectorBuilder(numRows);
+                    LongVector.Builder timestampsBuilder = blockFactory.newLongVectorBuilder(numRows)
+                ) {
+                    int perPageOrd = -1;
+                    for (int i = 0; i < numRows; i++) {
+                        boolean newGroup = globalTsid == -1 || randomInt(100) < 10;
+                        if (newGroup) {
+                            globalTsid++;
+                            timestamp = endTime;
+                            if (randomBoolean()) {
+                                timestamp -= between(0, 1000);
+                            }
+                        }
+                        if (perPageOrd == -1 || newGroup) {
+                            perPageOrd++;
+                            dictBuilder.appendBytesRef(new BytesRef(String.format(Locale.ROOT, "id-%06d", globalTsid)));
+                        }
+                        ordinalBuilder.appendInt(perPageOrd);
+                        if (randomInt(100) < 20) {
+                            timestamp -= between(1, 10);
+                        }
+                        timestampsBuilder.appendLong(timestamp);
+                    }
+                    try (
+                        var tsidBlock = new OrdinalBytesRefVector(ordinalBuilder.build(), dictBuilder.build()).asBlock();
+                        var timestampBlock = timestampsBuilder.build().asBlock()
+                    ) {
+                        Page page = new Page(tsidBlock, timestampBlock);
+                        Holder<IntVector> ords1 = new Holder<>();
+                        hash1.add(page, new GroupingAggregatorFunction.AddInput() {
+                            @Override
+                            public void add(int positionOffset, IntBlock groupIds) {
+                                throw new AssertionError("time-series block hash should emit a vector");
+                            }
+
+                            @Override
+                            public void add(int positionOffset, IntVector groupIds) {
+                                groupIds.incRef();
+                                ords1.set(groupIds);
+                            }
+
+                            @Override
+                            public void close() {
+
+                            }
+                        });
+                        Holder<IntVector> ords2 = new Holder<>();
+                        hash2.add(page, new GroupingAggregatorFunction.AddInput() {
+                            @Override
+                            public void add(int positionOffset, IntBlock groupIds) {
+                                // TODO: check why PackedValuesBlockHash doesn't emit a vector?
+                                IntVector vector = groupIds.asVector();
+                                assertNotNull("should emit a vector", vector);
+                                vector.incRef();
+                                ords2.set(vector);
+                            }
+
+                            @Override
+                            public void add(int positionOffset, IntVector groupIds) {
+                                groupIds.incRef();
+                                ords2.set(groupIds);
+                            }
+
+                            @Override
+                            public void close() {
+
+                            }
+                        });
+                        try {
+                            assertThat("input=" + page, ords1.get(), equalTo(ords2.get()));
+                        } finally {
+                            Releasables.close(ords1.get(), ords2.get());
+                        }
+                    }
+                }
+            }
+            Block[] keys1 = null;
+            Block[] keys2 = null;
+            try {
+                keys1 = hash1.getKeys();
+                keys2 = hash2.getKeys();
+                assertThat(keys1, equalTo(keys2));
+            } finally {
+                Releasables.close(keys1);
+                Releasables.close(keys2);
+            }
+        }
+    }
+
     record OrdsAndKeys(String description, int positionOffset, IntBlock ords, Block[] keys, IntVector nonEmpty) {}
 
     /**

+ 12 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

@@ -18,7 +18,6 @@ import org.elasticsearch.compute.operator.AggregationOperator;
 import org.elasticsearch.compute.operator.EvalOperator;
 import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory;
 import org.elasticsearch.compute.operator.Operator;
-import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
@@ -175,12 +174,12 @@ public abstract class AbstractPhysicalOperationProviders implements PhysicalOper
             );
             // time-series aggregation
             if (aggregateExec instanceof TimeSeriesAggregateExec ts) {
-                operatorFactory = new TimeSeriesAggregationOperator.Factory(
-                    ts.timeBucketRounding(context.foldCtx()),
-                    groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
+                operatorFactory = timeSeriesAggregatorOperatorFactory(
+                    ts,
                     aggregatorMode,
                     aggregatorFactories,
-                    context.pageSize(aggregateExec.estimatedRowSize())
+                    groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
+                    context
                 );
                 // ordinal grouping
             } else if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) {
@@ -379,4 +378,12 @@ public abstract class AbstractPhysicalOperationProviders implements PhysicalOper
         ElementType groupType,
         LocalExecutionPlannerContext context
     );
+
+    public abstract Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
+        TimeSeriesAggregateExec ts,
+        AggregatorMode aggregatorMode,
+        List<GroupingAggregator.Factory> aggregatorFactories,
+        List<BlockHash.GroupSpec> groupSpecs,
+        LocalExecutionPlannerContext context
+    );
 }

+ 22 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -14,7 +14,9 @@ import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.elasticsearch.common.logging.HeaderWarning;
+import org.elasticsearch.compute.aggregation.AggregatorMode;
 import org.elasticsearch.compute.aggregation.GroupingAggregator;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.lucene.DataPartitioning;
@@ -27,6 +29,7 @@ import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
 import org.elasticsearch.compute.operator.SourceOperator;
+import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexSettings;
@@ -61,6 +64,7 @@ import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort;
 import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
+import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
@@ -322,6 +326,24 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
         );
     }
 
+    @Override
+    public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
+        TimeSeriesAggregateExec ts,
+        AggregatorMode aggregatorMode,
+        List<GroupingAggregator.Factory> aggregatorFactories,
+        List<BlockHash.GroupSpec> groupSpecs,
+        LocalExecutionPlannerContext context
+    ) {
+        return new TimeSeriesAggregationOperator.Factory(
+            ts.timeBucketRounding(context.foldCtx()),
+            shardContexts.size() == 1 && ts.anyMatch(p -> p instanceof TimeSeriesSourceExec),
+            groupSpecs,
+            aggregatorMode,
+            aggregatorFactories,
+            context.pageSize(ts.estimatedRowSize())
+        );
+    }
+
     public static class DefaultShardContext implements ShardContext {
         private final int index;
         private final SearchExecutionContext ctx;

+ 21 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java

@@ -13,6 +13,7 @@ import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.Describable;
+import org.elasticsearch.compute.aggregation.AggregatorMode;
 import org.elasticsearch.compute.aggregation.GroupingAggregator;
 import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
 import org.elasticsearch.compute.data.Block;
@@ -31,6 +32,7 @@ import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
 import org.elasticsearch.compute.operator.SourceOperator;
 import org.elasticsearch.compute.operator.SourceOperator.SourceOperatorFactory;
+import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
 import org.elasticsearch.compute.test.TestBlockFactory;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.env.Environment;
@@ -58,6 +60,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractC
 import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
+import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
@@ -153,6 +156,24 @@ public class TestPhysicalOperationProviders extends AbstractPhysicalOperationPro
         );
     }
 
+    @Override
+    public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
+        TimeSeriesAggregateExec ts,
+        AggregatorMode aggregatorMode,
+        List<GroupingAggregator.Factory> aggregatorFactories,
+        List<BlockHash.GroupSpec> groupSpecs,
+        LocalExecutionPlannerContext context
+    ) {
+        return new TimeSeriesAggregationOperator.Factory(
+            ts.timeBucketRounding(context.foldCtx()),
+            false,
+            groupSpecs,
+            aggregatorMode,
+            aggregatorFactories,
+            context.pageSize(ts.estimatedRowSize())
+        );
+    }
+
     private class TestSourceOperator extends SourceOperator {
         private int index = 0;
         private final DriverContext driverContext;