Browse Source

Revert "Enable time-series block hash (#127488)"

This reverts commit 63b55e42793f30341701b738e4b80d538745777e.
Nhat Nguyen 5 months ago
parent
commit
34ebf8bb09

+ 30 - 61
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java

@@ -30,6 +30,8 @@ import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.ReleasableIterator;
 import org.elasticsearch.core.Releasables;
 
+import java.util.Objects;
+
 /**
  * An optimized block hash that receives two blocks: tsid and timestamp, which are sorted.
  * Since the incoming data is sorted, this block hash appends the incoming data to the internal arrays without lookup.
@@ -39,7 +41,7 @@ public final class TimeSeriesBlockHash extends BlockHash {
     private final int tsHashChannel;
     private final int timestampIntervalChannel;
 
-    private int lastTsidPosition = 0;
+    private final BytesRef lastTsid = new BytesRef();
     private final BytesRefArrayWithSize tsidArray;
 
     private long lastTimestamp;
@@ -62,70 +64,19 @@ public final class TimeSeriesBlockHash extends BlockHash {
         Releasables.close(tsidArray, timestampArray, perTsidCountArray);
     }
 
-    private OrdinalBytesRefVector getTsidVector(Page page) {
-        BytesRefBlock block = page.getBlock(tsHashChannel);
-        var ordinalBlock = block.asOrdinals();
-        if (ordinalBlock == null) {
-            throw new IllegalStateException("expected ordinal block for tsid");
-        }
-        var ordinalVector = ordinalBlock.asVector();
-        if (ordinalVector == null) {
-            throw new IllegalStateException("expected ordinal vector for tsid");
-        }
-        return ordinalVector;
-    }
-
-    private LongVector getTimestampVector(Page page) {
-        final LongBlock timestampsBlock = page.getBlock(timestampIntervalChannel);
-        LongVector timestampsVector = timestampsBlock.asVector();
-        if (timestampsVector == null) {
-            throw new IllegalStateException("expected long vector for timestamp");
-        }
-        return timestampsVector;
-    }
-
     @Override
     public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
-        final BytesRefVector tsidDict;
-        final IntVector tsidOrdinals;
-        {
-            final var tsidVector = getTsidVector(page);
-            tsidDict = tsidVector.getDictionaryVector();
-            tsidOrdinals = tsidVector.getOrdinalsVector();
-        }
-        try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidOrdinals.getPositionCount())) {
+        final BytesRefBlock tsidBlock = page.getBlock(tsHashChannel);
+        final BytesRefVector tsidVector = Objects.requireNonNull(tsidBlock.asVector(), "tsid input must be a vector");
+        final LongBlock timestampBlock = page.getBlock(timestampIntervalChannel);
+        final LongVector timestampVector = Objects.requireNonNull(timestampBlock.asVector(), "timestamp input must be a vector");
+        try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidVector.getPositionCount())) {
             final BytesRef spare = new BytesRef();
-            final BytesRef lastTsid = new BytesRef();
-            final LongVector timestampVector = getTimestampVector(page);
-            int lastOrd = -1;
-            for (int i = 0; i < tsidOrdinals.getPositionCount(); i++) {
-                final int newOrd = tsidOrdinals.getInt(i);
-                boolean newGroup = false;
-                if (lastOrd != newOrd) {
-                    final var newTsid = tsidDict.getBytesRef(newOrd, spare);
-                    if (positionCount() == 0) {
-                        newGroup = true;
-                    } else if (lastOrd == -1) {
-                        tsidArray.get(lastTsidPosition, lastTsid);
-                        newGroup = lastTsid.equals(newTsid) == false;
-                    } else {
-                        newGroup = true;
-                    }
-                    if (newGroup) {
-                        endTsidGroup();
-                        lastTsidPosition = tsidArray.count;
-                        tsidArray.append(newTsid);
-                    }
-                    lastOrd = newOrd;
-                }
+            // TODO: optimize incoming ordinal block
+            for (int i = 0; i < tsidVector.getPositionCount(); i++) {
+                final BytesRef tsid = tsidVector.getBytesRef(i, spare);
                 final long timestamp = timestampVector.getLong(i);
-                if (newGroup || timestamp != lastTimestamp) {
-                    assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
-                    timestampArray.append(timestamp);
-                    lastTimestamp = timestamp;
-                    currentTimestampCount++;
-                }
-                ordsBuilder.appendInt(timestampArray.count - 1);
+                ordsBuilder.appendInt(addOnePosition(tsid, timestamp));
             }
             try (var ords = ordsBuilder.build()) {
                 addInput.add(0, ords);
@@ -133,6 +84,24 @@ public final class TimeSeriesBlockHash extends BlockHash {
         }
     }
 
+    private int addOnePosition(BytesRef tsid, long timestamp) {
+        boolean newGroup = false;
+        if (positionCount() == 0 || lastTsid.equals(tsid) == false) {
+            assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
+            endTsidGroup();
+            tsidArray.append(tsid);
+            tsidArray.get(tsidArray.count - 1, lastTsid);
+            newGroup = true;
+        }
+        if (newGroup || timestamp != lastTimestamp) {
+            assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
+            timestampArray.append(timestamp);
+            lastTimestamp = timestamp;
+            currentTimestampCount++;
+        }
+        return positionCount() - 1;
+    }
+
     private void endTsidGroup() {
         if (currentTimestampCount > 0) {
             perTsidCountArray.append(currentTimestampCount);

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java

@@ -75,7 +75,7 @@ public final class OrdinalBytesRefBlock extends AbstractNonThreadSafeRefCounted
     }
 
     @Override
-    public OrdinalBytesRefVector asVector() {
+    public BytesRefVector asVector() {
         IntVector vector = ordinals.asVector();
         if (vector != null) {
             return new OrdinalBytesRefVector(vector, bytes);

+ 11 - 14
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java

@@ -14,7 +14,6 @@ import org.elasticsearch.compute.aggregation.GroupingAggregator;
 import org.elasticsearch.compute.aggregation.GroupingAggregatorEvaluationContext;
 import org.elasticsearch.compute.aggregation.TimeSeriesGroupingAggregatorEvaluationContext;
 import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
-import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.LongBlock;
@@ -31,7 +30,6 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator {
 
     public record Factory(
         Rounding.Prepared timeBucket,
-        boolean sortedInput,
         List<BlockHash.GroupSpec> groups,
         AggregatorMode aggregatorMode,
         List<GroupingAggregator.Factory> aggregators,
@@ -40,18 +38,17 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator {
         @Override
         public Operator get(DriverContext driverContext) {
             // TODO: use TimeSeriesBlockHash when possible
-            return new TimeSeriesAggregationOperator(timeBucket, aggregators, () -> {
-                if (sortedInput && groups.size() == 2) {
-                    return new TimeSeriesBlockHash(groups.get(0).channel(), groups.get(1).channel(), driverContext.blockFactory());
-                } else {
-                    return BlockHash.build(
-                        groups,
-                        driverContext.blockFactory(),
-                        maxPageSize,
-                        true // we can enable optimizations as the inputs are vectors
-                    );
-                }
-            }, driverContext);
+            return new TimeSeriesAggregationOperator(
+                timeBucket,
+                aggregators,
+                () -> BlockHash.build(
+                    groups,
+                    driverContext.blockFactory(),
+                    maxPageSize,
+                    true // we can enable optimizations as the inputs are vectors
+                ),
+                driverContext
+            );
         }
 
         @Override

+ 0 - 112
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java

@@ -22,7 +22,6 @@ import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
-import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
 import org.elasticsearch.compute.data.OrdinalBytesRefVector;
 import org.elasticsearch.compute.data.Page;
@@ -30,14 +29,12 @@ import org.elasticsearch.compute.test.TestBlockFactory;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.ReleasableIterator;
 import org.elasticsearch.core.Releasables;
-import org.elasticsearch.xpack.esql.core.util.Holder;
 import org.junit.After;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
@@ -1329,115 +1326,6 @@ public class BlockHashTests extends BlockHashTestCase {
         }
     }
 
-    public void testTimeSeriesBlockHash() {
-        long endTime = randomLongBetween(10_000_000, 20_000_000);
-        var hash1 = new TimeSeriesBlockHash(0, 1, blockFactory);
-        var hash2 = BlockHash.build(
-            List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF), new BlockHash.GroupSpec(1, ElementType.LONG)),
-            blockFactory,
-            32 * 1024,
-            forcePackedHash
-        );
-        int numPages = between(1, 100);
-        int globalTsid = -1;
-        long timestamp = endTime;
-        try (hash1; hash2) {
-            for (int p = 0; p < numPages; p++) {
-                int numRows = between(1, 1000);
-                if (randomBoolean()) {
-                    timestamp -= between(0, 100);
-                }
-                try (
-                    BytesRefVector.Builder dictBuilder = blockFactory.newBytesRefVectorBuilder(numRows);
-                    IntVector.Builder ordinalBuilder = blockFactory.newIntVectorBuilder(numRows);
-                    LongVector.Builder timestampsBuilder = blockFactory.newLongVectorBuilder(numRows)
-                ) {
-                    int perPageOrd = -1;
-                    for (int i = 0; i < numRows; i++) {
-                        boolean newGroup = globalTsid == -1 || randomInt(100) < 10;
-                        if (newGroup) {
-                            globalTsid++;
-                            timestamp = endTime;
-                            if (randomBoolean()) {
-                                timestamp -= between(0, 1000);
-                            }
-                        }
-                        if (perPageOrd == -1 || newGroup) {
-                            perPageOrd++;
-                            dictBuilder.appendBytesRef(new BytesRef(String.format(Locale.ROOT, "id-%06d", globalTsid)));
-                        }
-                        ordinalBuilder.appendInt(perPageOrd);
-                        if (randomInt(100) < 20) {
-                            timestamp -= between(1, 10);
-                        }
-                        timestampsBuilder.appendLong(timestamp);
-                    }
-                    try (
-                        var tsidBlock = new OrdinalBytesRefVector(ordinalBuilder.build(), dictBuilder.build()).asBlock();
-                        var timestampBlock = timestampsBuilder.build().asBlock()
-                    ) {
-                        Page page = new Page(tsidBlock, timestampBlock);
-                        Holder<IntVector> ords1 = new Holder<>();
-                        hash1.add(page, new GroupingAggregatorFunction.AddInput() {
-                            @Override
-                            public void add(int positionOffset, IntBlock groupIds) {
-                                throw new AssertionError("time-series block hash should emit a vector");
-                            }
-
-                            @Override
-                            public void add(int positionOffset, IntVector groupIds) {
-                                groupIds.incRef();
-                                ords1.set(groupIds);
-                            }
-
-                            @Override
-                            public void close() {
-
-                            }
-                        });
-                        Holder<IntVector> ords2 = new Holder<>();
-                        hash2.add(page, new GroupingAggregatorFunction.AddInput() {
-                            @Override
-                            public void add(int positionOffset, IntBlock groupIds) {
-                                // TODO: check why PackedValuesBlockHash doesn't emit a vector?
-                                IntVector vector = groupIds.asVector();
-                                assertNotNull("should emit a vector", vector);
-                                vector.incRef();
-                                ords2.set(vector);
-                            }
-
-                            @Override
-                            public void add(int positionOffset, IntVector groupIds) {
-                                groupIds.incRef();
-                                ords2.set(groupIds);
-                            }
-
-                            @Override
-                            public void close() {
-
-                            }
-                        });
-                        try {
-                            assertThat("input=" + page, ords1.get(), equalTo(ords2.get()));
-                        } finally {
-                            Releasables.close(ords1.get(), ords2.get());
-                        }
-                    }
-                }
-            }
-            Block[] keys1 = null;
-            Block[] keys2 = null;
-            try {
-                keys1 = hash1.getKeys();
-                keys2 = hash2.getKeys();
-                assertThat(keys1, equalTo(keys2));
-            } finally {
-                Releasables.close(keys1);
-                Releasables.close(keys2);
-            }
-        }
-    }
-
     record OrdsAndKeys(String description, int positionOffset, IntBlock ords, Block[] keys, IntVector nonEmpty) {}
 
     /**

+ 5 - 12
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

@@ -18,6 +18,7 @@ import org.elasticsearch.compute.operator.AggregationOperator;
 import org.elasticsearch.compute.operator.EvalOperator;
 import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory;
 import org.elasticsearch.compute.operator.Operator;
+import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
@@ -174,12 +175,12 @@ public abstract class AbstractPhysicalOperationProviders implements PhysicalOper
             );
             // time-series aggregation
             if (aggregateExec instanceof TimeSeriesAggregateExec ts) {
-                operatorFactory = timeSeriesAggregatorOperatorFactory(
-                    ts,
+                operatorFactory = new TimeSeriesAggregationOperator.Factory(
+                    ts.timeBucketRounding(context.foldCtx()),
+                    groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
                     aggregatorMode,
                     aggregatorFactories,
-                    groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
-                    context
+                    context.pageSize(aggregateExec.estimatedRowSize())
                 );
                 // ordinal grouping
             } else if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) {
@@ -378,12 +379,4 @@ public abstract class AbstractPhysicalOperationProviders implements PhysicalOper
         ElementType groupType,
         LocalExecutionPlannerContext context
     );
-
-    public abstract Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
-        TimeSeriesAggregateExec ts,
-        AggregatorMode aggregatorMode,
-        List<GroupingAggregator.Factory> aggregatorFactories,
-        List<BlockHash.GroupSpec> groupSpecs,
-        LocalExecutionPlannerContext context
-    );
 }

+ 0 - 22
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -14,9 +14,7 @@ import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.elasticsearch.common.logging.HeaderWarning;
-import org.elasticsearch.compute.aggregation.AggregatorMode;
 import org.elasticsearch.compute.aggregation.GroupingAggregator;
-import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.lucene.DataPartitioning;
@@ -29,7 +27,6 @@ import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexSettings;
@@ -64,7 +61,6 @@ import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort;
 import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
-import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
@@ -303,24 +299,6 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
         );
     }
 
-    @Override
-    public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
-        TimeSeriesAggregateExec ts,
-        AggregatorMode aggregatorMode,
-        List<GroupingAggregator.Factory> aggregatorFactories,
-        List<BlockHash.GroupSpec> groupSpecs,
-        LocalExecutionPlannerContext context
-    ) {
-        return new TimeSeriesAggregationOperator.Factory(
-            ts.timeBucketRounding(context.foldCtx()),
-            shardContexts.size() == 1 && ts.anyMatch(p -> p instanceof TimeSeriesSourceExec),
-            groupSpecs,
-            aggregatorMode,
-            aggregatorFactories,
-            context.pageSize(ts.estimatedRowSize())
-        );
-    }
-
     public static class DefaultShardContext implements ShardContext {
         private final int index;
         private final SearchExecutionContext ctx;

+ 0 - 10
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

@@ -157,16 +157,6 @@ public class PlannerUtils {
         return indices.toArray(String[]::new);
     }
 
-    public static boolean requireTimeSeriesSource(PhysicalPlan plan) {
-        return plan.anyMatch(p -> {
-            if (p instanceof FragmentExec f) {
-                return f.fragment().anyMatch(l -> l instanceof EsRelation s && s.indexMode() == IndexMode.TIME_SERIES);
-            } else {
-                return false;
-            }
-        });
-    }
-
     private static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> action) {
         plan.forEachDown(FragmentExec.class, f -> f.fragment().forEachDown(EsRelation.class, r -> {
             if (r.indexMode() != IndexMode.LOOKUP) {

+ 0 - 21
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java

@@ -13,7 +13,6 @@ import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.Describable;
-import org.elasticsearch.compute.aggregation.AggregatorMode;
 import org.elasticsearch.compute.aggregation.GroupingAggregator;
 import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
 import org.elasticsearch.compute.data.Block;
@@ -32,7 +31,6 @@ import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
 import org.elasticsearch.compute.operator.SourceOperator;
 import org.elasticsearch.compute.operator.SourceOperator.SourceOperatorFactory;
-import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
 import org.elasticsearch.compute.test.TestBlockFactory;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.env.Environment;
@@ -60,7 +58,6 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractC
 import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
-import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
@@ -156,24 +153,6 @@ public class TestPhysicalOperationProviders extends AbstractPhysicalOperationPro
         );
     }
 
-    @Override
-    public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
-        TimeSeriesAggregateExec ts,
-        AggregatorMode aggregatorMode,
-        List<GroupingAggregator.Factory> aggregatorFactories,
-        List<BlockHash.GroupSpec> groupSpecs,
-        LocalExecutionPlannerContext context
-    ) {
-        return new TimeSeriesAggregationOperator.Factory(
-            ts.timeBucketRounding(context.foldCtx()),
-            false,
-            groupSpecs,
-            aggregatorMode,
-            aggregatorFactories,
-            context.pageSize(ts.estimatedRowSize())
-        );
-    }
-
     private class TestSourceOperator extends SourceOperator {
         private int index = 0;
         private final DriverContext driverContext;