Browse Source

Implement time series grouping via HashAggregationOperator using a specialized block hash implementation (#106127)

This change adds new operator factory that performs grouping by `_tsid`
and `@timestamp` field.

The new operator factory delegates to the `HashAggregationOperator` do
the grouping, but does make use of a specialized block hash (which makes
a few assumptions about the group fields). This is an initial
implementation that is not optimize for 'local' grouping.

The new time series grouping operator factory isn't usable from API and
is only usable from tests.
Martijn van Groningen 1 year ago
parent
commit
377c7e568f

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java

@@ -33,7 +33,7 @@ import java.util.List;
  */
 public abstract sealed class BlockHash implements Releasable, SeenGroupIds //
     permits BooleanBlockHash, BytesRefBlockHash, DoubleBlockHash, IntBlockHash, LongBlockHash,//
-    NullBlockHash, PackedValuesBlockHash, BytesRefLongBlockHash, LongLongBlockHash {
+    NullBlockHash, PackedValuesBlockHash, BytesRefLongBlockHash, LongLongBlockHash, TimeSeriesBlockHash {
 
     protected final BlockFactory blockFactory;
 

+ 130 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java

@@ -0,0 +1,130 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation.blockhash;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.BitArray;
+import org.elasticsearch.common.util.BytesRefHash;
+import org.elasticsearch.common.util.LongLongHash;
+import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
+import org.elasticsearch.compute.aggregation.SeenGroupIds;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.BytesRefVector;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasables;
+
+import java.util.Objects;
+
+public final class TimeSeriesBlockHash extends BlockHash {
+
+    private final int tsHashChannel;
+    private final int timestampIntervalChannel;
+    private final BytesRefHash tsidHashes;
+    private final LongLongHash intervalHash;
+
+    long groupOrdinal = -1;
+    BytesRef previousTsidHash;
+    long previousTimestampInterval;
+
+    public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, DriverContext driverContext) {
+        super(driverContext.blockFactory());
+        this.tsHashChannel = tsHashChannel;
+        this.timestampIntervalChannel = timestampIntervalChannel;
+        this.tsidHashes = new BytesRefHash(1, blockFactory.bigArrays());
+        this.intervalHash = new LongLongHash(1, blockFactory.bigArrays());
+    }
+
+    @Override
+    public void close() {
+        Releasables.close(tsidHashes, intervalHash);
+    }
+
+    @Override
+    public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
+        BytesRefBlock tsHashBlock = page.getBlock(tsHashChannel);
+        BytesRefVector tsHashVector = Objects.requireNonNull(tsHashBlock.asVector());
+        try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsHashVector.getPositionCount())) {
+            LongBlock timestampIntervalBlock = page.getBlock(timestampIntervalChannel);
+            BytesRef spare = new BytesRef();
+            for (int i = 0; i < tsHashVector.getPositionCount(); i++) {
+                BytesRef tsHash = tsHashVector.getBytesRef(i, spare);
+                long timestampInterval = timestampIntervalBlock.getLong(i);
+                // Optimization that relies on the fact that blocks are sorted by tsid hash and timestamp
+                if (tsHash.equals(previousTsidHash) == false || timestampInterval != previousTimestampInterval) {
+                    long tsidOrdinal = tsidHashes.add(tsHash);
+                    if (tsidOrdinal < 0) {
+                        tsidOrdinal = -1 - tsidOrdinal;
+                    }
+                    groupOrdinal = intervalHash.add(tsidOrdinal, timestampInterval);
+                    if (groupOrdinal < 0) {
+                        groupOrdinal = -1 - groupOrdinal;
+                    }
+                    previousTsidHash = BytesRef.deepCopyOf(tsHash);
+                    previousTimestampInterval = timestampInterval;
+                }
+                ordsBuilder.appendInt(Math.toIntExact(groupOrdinal));
+            }
+            try (var ords = ordsBuilder.build()) {
+                addInput.add(0, ords);
+            }
+        }
+    }
+
+    @Override
+    public Block[] getKeys() {
+        int positions = (int) intervalHash.size();
+        BytesRefVector tsidHashes = null;
+        LongVector timestampIntervals = null;
+        try (
+            BytesRefVector.Builder tsidHashesBuilder = blockFactory.newBytesRefVectorBuilder(positions);
+            LongVector.Builder timestampIntervalsBuilder = blockFactory.newLongVectorFixedBuilder(positions)
+        ) {
+            BytesRef scratch = new BytesRef();
+            for (long i = 0; i < positions; i++) {
+                BytesRef key1 = this.tsidHashes.get(intervalHash.getKey1(i), scratch);
+                tsidHashesBuilder.appendBytesRef(key1);
+                timestampIntervalsBuilder.appendLong(intervalHash.getKey2(i));
+            }
+            tsidHashes = tsidHashesBuilder.build();
+            timestampIntervals = timestampIntervalsBuilder.build();
+        } finally {
+            if (timestampIntervals == null) {
+                Releasables.closeExpectNoException(tsidHashes);
+            }
+        }
+        return new Block[] { tsidHashes.asBlock(), timestampIntervals.asBlock() };
+    }
+
+    @Override
+    public IntVector nonEmpty() {
+        long endExclusive = intervalHash.size();
+        return IntVector.range(0, Math.toIntExact(endExclusive), blockFactory);
+    }
+
+    @Override
+    public BitArray seenGroupIds(BigArrays bigArrays) {
+        long size = intervalHash.size();
+        return new SeenGroupIds.Range(0, Math.toIntExact(size)).seenGroupIds(bigArrays);
+    }
+
+    public String toString() {
+        return "TimeSeriesBlockHash{keys=[BytesRefKey[channel="
+            + tsHashChannel
+            + "], LongKey[channel="
+            + timestampIntervalChannel
+            + "]], entries="
+            + groupOrdinal
+            + "b}";
+    }
+}

+ 4 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

@@ -46,6 +46,10 @@ public final class LuceneSliceQueue {
         return totalSlices;
     }
 
+    public Iterable<LuceneSlice> getSlices() {
+        return slices;
+    }
+
     public static LuceneSliceQueue create(
         List<? extends ShardContext> contexts,
         Function<ShardContext, Weight> weightFunction,

+ 73 - 25
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.compute.lucene;
 
+import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.SortedDocValues;
 import org.apache.lucene.index.SortedNumericDocValues;
@@ -16,7 +17,9 @@ import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.PriorityQueue;
+import org.elasticsearch.common.Rounding;
 import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.BytesRefVector;
 import org.elasticsearch.compute.data.DocVector;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
@@ -25,6 +28,8 @@ import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.SourceOperator;
 import org.elasticsearch.core.Releasables;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -43,13 +48,18 @@ import java.util.function.Function;
  * This operator currently only supports shard level concurrency. A new concurrency mechanism should be introduced at the time serie level
  * in order to read tsdb indices in parallel.
  */
-public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize, int taskConcurrency, LuceneSliceQueue sliceQueue)
-    implements
-        LuceneOperator.Factory {
+public record TimeSeriesSortedSourceOperatorFactory(
+    int limit,
+    int maxPageSize,
+    int taskConcurrency,
+    TimeValue timeSeriesPeriod,
+    LuceneSliceQueue sliceQueue
+) implements LuceneOperator.Factory {
 
     @Override
     public SourceOperator get(DriverContext driverContext) {
-        return new Impl(driverContext.blockFactory(), sliceQueue, maxPageSize, limit);
+        var rounding = timeSeriesPeriod.equals(TimeValue.ZERO) == false ? Rounding.builder(timeSeriesPeriod).build() : null;
+        return new Impl(driverContext.blockFactory(), sliceQueue, maxPageSize, limit, rounding);
     }
 
     @Override
@@ -66,13 +76,14 @@ public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize,
         int limit,
         int maxPageSize,
         int taskConcurrency,
+        TimeValue timeSeriesPeriod,
         List<? extends ShardContext> searchContexts,
         Function<ShardContext, Query> queryFunction
     ) {
         var weightFunction = LuceneOperator.weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES);
         var sliceQueue = LuceneSliceQueue.create(searchContexts, weightFunction, DataPartitioning.SHARD, taskConcurrency);
         taskConcurrency = Math.min(sliceQueue.totalSlices(), taskConcurrency);
-        return new TimeSeriesSortedSourceOperatorFactory(limit, maxPageSize, taskConcurrency, sliceQueue);
+        return new TimeSeriesSortedSourceOperatorFactory(limit, maxPageSize, taskConcurrency, timeSeriesPeriod, sliceQueue);
     }
 
     static final class Impl extends SourceOperator {
@@ -80,26 +91,49 @@ public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize,
         private final int maxPageSize;
         private final BlockFactory blockFactory;
         private final LuceneSliceQueue sliceQueue;
+        private final Rounding.Prepared rounding;
         private int currentPagePos = 0;
         private int remainingDocs;
         private boolean doneCollecting;
         private IntVector.Builder docsBuilder;
         private IntVector.Builder segmentsBuilder;
-        private LongVector.Builder timestampIntervalBuilder;
-        // TODO: handle when a time series spans across backing indices
-        // In that case we need to bytes representation of the tsid
-        private IntVector.Builder tsOrdBuilder;
+        private LongVector.Builder timestampsBuilder;
+        private LongVector.Builder intervalsBuilder;
+        // TODO: add an ordinal block for tsid hashes
+        // (This allows for efficiently grouping by tsid locally, no need to use bytes representation of tsid hash)
+        private BytesRefVector.Builder tsHashesBuilder;
         private TimeSeriesIterator iterator;
 
-        Impl(BlockFactory blockFactory, LuceneSliceQueue sliceQueue, int maxPageSize, int limit) {
+        Impl(BlockFactory blockFactory, LuceneSliceQueue sliceQueue, int maxPageSize, int limit, Rounding rounding) {
             this.maxPageSize = maxPageSize;
             this.blockFactory = blockFactory;
             this.remainingDocs = limit;
             this.docsBuilder = blockFactory.newIntVectorBuilder(Math.min(limit, maxPageSize));
             this.segmentsBuilder = null;
-            this.timestampIntervalBuilder = blockFactory.newLongVectorBuilder(Math.min(limit, maxPageSize));
-            this.tsOrdBuilder = blockFactory.newIntVectorBuilder(Math.min(limit, maxPageSize));
+            this.timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(limit, maxPageSize));
+            this.tsHashesBuilder = blockFactory.newBytesRefVectorBuilder(Math.min(limit, maxPageSize));
             this.sliceQueue = sliceQueue;
+            if (rounding != null) {
+                try {
+                    long minTimestamp = Long.MAX_VALUE;
+                    long maxTimestamp = Long.MIN_VALUE;
+                    for (var slice : sliceQueue.getSlices()) {
+                        for (var leaf : slice.leaves()) {
+                            var pointValues = leaf.leafReaderContext().reader().getPointValues(DataStreamTimestampFieldMapper.DEFAULT_PATH);
+                            long segmentMin = LongPoint.decodeDimension(pointValues.getMinPackedValue(), 0);
+                            minTimestamp = Math.min(segmentMin, minTimestamp);
+                            long segmentMax = LongPoint.decodeDimension(pointValues.getMaxPackedValue(), 0);
+                            maxTimestamp = Math.max(segmentMax, maxTimestamp);
+                        }
+                    }
+                    this.rounding = rounding.prepare(minTimestamp, maxTimestamp);
+                    this.intervalsBuilder = blockFactory.newLongVectorBuilder(Math.min(limit, maxPageSize));
+                } catch (IOException ioe) {
+                    throw new UncheckedIOException(ioe);
+                }
+            } else {
+                this.rounding = null;
+            }
         }
 
         @Override
@@ -127,8 +161,9 @@ public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize,
             IntBlock shard = null;
             IntVector leaf = null;
             IntVector docs = null;
-            LongVector timestampIntervals = null;
-            IntVector tsids = null;
+            LongVector timestamps = null;
+            LongVector intervals = null;
+            BytesRefVector tsids = null;
             try {
                 if (iterator == null) {
                     var slice = sliceQueue.nextSlice();
@@ -154,15 +189,22 @@ public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize,
                 docs = docsBuilder.build();
                 docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
 
-                timestampIntervals = timestampIntervalBuilder.build();
-                timestampIntervalBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize));
-                tsids = tsOrdBuilder.build();
-                tsOrdBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
+                timestamps = timestampsBuilder.build();
+                timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize));
+                if (rounding != null) {
+                    intervals = intervalsBuilder.build();
+                    intervalsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize));
+                } else {
+                    intervals = blockFactory.newConstantLongVector(0, timestamps.getPositionCount());
+                }
+                tsids = tsHashesBuilder.build();
+                tsHashesBuilder = blockFactory.newBytesRefVectorBuilder(Math.min(remainingDocs, maxPageSize));
                 page = new Page(
                     currentPagePos,
                     new DocVector(shard.asVector(), leaf, docs, leaf.isConstant()).asBlock(),
                     tsids.asBlock(),
-                    timestampIntervals.asBlock()
+                    timestamps.asBlock(),
+                    intervals.asBlock()
                 );
 
                 currentPagePos = 0;
@@ -173,7 +215,7 @@ public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize,
                 throw new UncheckedIOException(e);
             } finally {
                 if (page == null) {
-                    Releasables.closeExpectNoException(shard, leaf, docs, timestampIntervals, tsids);
+                    Releasables.closeExpectNoException(shard, leaf, docs, timestamps, tsids, intervals);
                 }
             }
             return page;
@@ -181,7 +223,7 @@ public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize,
 
         @Override
         public void close() {
-            Releasables.closeExpectNoException(docsBuilder, segmentsBuilder, timestampIntervalBuilder, tsOrdBuilder);
+            Releasables.closeExpectNoException(docsBuilder, segmentsBuilder, timestampsBuilder, intervalsBuilder, tsHashesBuilder);
         }
 
         class TimeSeriesIterator {
@@ -236,8 +278,11 @@ public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize,
                         Leaf leaf = queue.top();
                         segmentsBuilder.appendInt(leaf.segmentOrd);
                         docsBuilder.appendInt(leaf.iterator.docID());
-                        timestampIntervalBuilder.appendLong(leaf.timestamp);
-                        tsOrdBuilder.appendInt(globalTsidOrd);
+                        timestampsBuilder.appendLong(leaf.timestamp);
+                        if (rounding != null) {
+                            intervalsBuilder.appendLong(rounding.round(leaf.timestamp));
+                        }
+                        tsHashesBuilder.appendBytesRef(currentTsid);
                         final Leaf newTop;
                         if (leaf.nextDoc()) {
                             // TODO: updating the top is one of the most expensive parts of this operation.
@@ -257,8 +302,11 @@ public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize,
                     // Only one segment, so no need to use priority queue and use segment ordinals as tsid ord.
                     leaf.reinitializeIfNeeded(Thread.currentThread());
                     while (leaf.nextDoc()) {
-                        tsOrdBuilder.appendInt(leaf.timeSeriesHashOrd);
-                        timestampIntervalBuilder.appendLong(leaf.timestamp);
+                        tsHashesBuilder.appendBytesRef(leaf.timeSeriesHash);
+                        timestampsBuilder.appendLong(leaf.timestamp);
+                        if (rounding != null) {
+                            intervalsBuilder.appendLong(rounding.round(leaf.timestamp));
+                        }
                         // Don't append segment ord, because there is only one segment.
                         docsBuilder.appendInt(leaf.iterator.docID());
                         currentPagePos++;

+ 48 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactory.java

@@ -0,0 +1,48 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.compute.aggregation.AggregatorMode;
+import org.elasticsearch.compute.aggregation.GroupingAggregator;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
+import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
+import org.elasticsearch.core.TimeValue;
+
+import java.util.List;
+
+public record TimeSeriesAggregationOperatorFactory(
+    AggregatorMode mode,
+    int tsHashChannel,
+    int timestampIntervalChannel,
+    TimeValue timeSeriesPeriod,
+    List<GroupingAggregator.Factory> aggregators,
+    int maxPageSize
+) implements Operator.OperatorFactory {
+
+    @Override
+    public String describe() {
+        return "TimeSeriesAggregationOperator[mode="
+            + mode
+            + ", tsHashChannel = "
+            + tsHashChannel
+            + ", timestampIntervalChannel = "
+            + timestampIntervalChannel
+            + ", timeSeriesPeriod = "
+            + timeSeriesPeriod
+            + ", maxPageSize = "
+            + maxPageSize
+            + "]";
+    }
+
+    @Override
+    public Operator get(DriverContext driverContext) {
+        BlockHash blockHash = new TimeSeriesBlockHash(tsHashChannel, timestampIntervalChannel, driverContext);
+        return new HashAggregationOperator(aggregators, () -> blockHash, driverContext);
+    }
+
+}

+ 56 - 205
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java

@@ -26,34 +26,24 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.tests.index.RandomIndexWriter;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.Randomness;
-import org.elasticsearch.compute.aggregation.AggregatorMode;
-import org.elasticsearch.compute.aggregation.RateLongAggregatorFunctionSupplier;
-import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
-import org.elasticsearch.compute.data.BytesRefBlock;
 import org.elasticsearch.compute.data.BytesRefVector;
 import org.elasticsearch.compute.data.DocVector;
-import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.ElementType;
-import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.AnyOperatorTestCase;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
-import org.elasticsearch.compute.operator.HashAggregationOperator;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.OperatorTestCase;
-import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
 import org.elasticsearch.compute.operator.TestResultPageSinkOperator;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.index.mapper.BlockDocValuesReader;
 import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
-import org.elasticsearch.index.mapper.SourceLoader;
 import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
 import org.junit.After;
 
@@ -65,9 +55,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
-import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -91,12 +81,12 @@ public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
         // for now we emit at most one time series each page
         int offset = 0;
         for (Page page : results) {
-            assertThat(page.getBlockCount(), equalTo(5));
+            assertThat(page.getBlockCount(), equalTo(6));
             DocVector docVector = (DocVector) page.getBlock(0).asVector();
-            IntVector tsidVector = (IntVector) page.getBlock(1).asVector();
+            BytesRefVector tsidVector = (BytesRefVector) page.getBlock(1).asVector();
             LongVector timestampVector = (LongVector) page.getBlock(2).asVector();
-            LongVector voltageVector = (LongVector) page.getBlock(3).asVector();
-            BytesRefVector hostnameVector = (BytesRefVector) page.getBlock(4).asVector();
+            LongVector voltageVector = (LongVector) page.getBlock(4).asVector();
+            BytesRefVector hostnameVector = (BytesRefVector) page.getBlock(5).asVector();
             for (int i = 0; i < page.getPositionCount(); i++) {
                 int expectedTsidOrd = offset / numSamplesPerTS;
                 String expectedHostname = String.format(Locale.ROOT, "host-%02d", expectedTsidOrd);
@@ -106,7 +96,7 @@ public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
                 assertThat(docVector.shards().getInt(i), equalTo(0));
                 assertThat(voltageVector.getLong(i), equalTo(expectedVoltage));
                 assertThat(hostnameVector.getBytesRef(i, new BytesRef()).utf8ToString(), equalTo(expectedHostname));
-                assertThat(tsidVector.getInt(i), equalTo(expectedTsidOrd));
+                assertThat(tsidVector.getBytesRef(i, new BytesRef()).utf8ToString(), equalTo("\u0001\bhostnames\u0007" + expectedHostname));
                 assertThat(timestampVector.getLong(i), equalTo(expectedTimestamp));
                 offset++;
             }
@@ -121,27 +111,27 @@ public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
         List<Page> results = runDriver(limit, randomIntBetween(1, 1024), randomBoolean(), numTimeSeries, numSamplesPerTS, timestampStart);
         assertThat(results, hasSize(1));
         Page page = results.get(0);
-        assertThat(page.getBlockCount(), equalTo(5));
+        assertThat(page.getBlockCount(), equalTo(6));
 
         DocVector docVector = (DocVector) page.getBlock(0).asVector();
         assertThat(docVector.getPositionCount(), equalTo(limit));
 
-        IntVector tsidVector = (IntVector) page.getBlock(1).asVector();
+        BytesRefVector tsidVector = (BytesRefVector) page.getBlock(1).asVector();
         assertThat(tsidVector.getPositionCount(), equalTo(limit));
 
         LongVector timestampVector = (LongVector) page.getBlock(2).asVector();
         assertThat(timestampVector.getPositionCount(), equalTo(limit));
 
-        LongVector voltageVector = (LongVector) page.getBlock(3).asVector();
+        LongVector voltageVector = (LongVector) page.getBlock(4).asVector();
         assertThat(voltageVector.getPositionCount(), equalTo(limit));
 
-        BytesRefVector hostnameVector = (BytesRefVector) page.getBlock(4).asVector();
+        BytesRefVector hostnameVector = (BytesRefVector) page.getBlock(5).asVector();
         assertThat(hostnameVector.getPositionCount(), equalTo(limit));
 
         assertThat(docVector.shards().getInt(0), equalTo(0));
         assertThat(voltageVector.getLong(0), equalTo(5L));
         assertThat(hostnameVector.getBytesRef(0, new BytesRef()).utf8ToString(), equalTo("host-00"));
-        assertThat(tsidVector.getInt(0), equalTo(0));
+        assertThat(tsidVector.getBytesRef(0, new BytesRef()).utf8ToString(), equalTo("\u0001\bhostnames\u0007host-00")); // legacy tsid
         assertThat(timestampVector.getLong(0), equalTo(timestampStart + ((numSamplesPerTS - 1) * 10_000L)));
     }
 
@@ -161,13 +151,21 @@ public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
         }
         int maxPageSize = between(1, 1024);
         int limit = randomBoolean() ? between(1, 100000) : Integer.MAX_VALUE;
-        var timeSeriesFactory = createTimeSeriesSourceOperator(limit, maxPageSize, randomBoolean(), writer -> {
-            Randomness.shuffle(docs);
-            for (Doc doc : docs) {
-                writeTS(writer, doc.timestamp, new Object[] { "hostname", "h" + doc.host }, new Object[] { "metric", doc.metric });
+        var timeSeriesFactory = createTimeSeriesSourceOperator(
+            directory,
+            r -> this.reader = r,
+            limit,
+            maxPageSize,
+            randomBoolean(),
+            TimeValue.ZERO,
+            writer -> {
+                Randomness.shuffle(docs);
+                for (Doc doc : docs) {
+                    writeTS(writer, doc.timestamp, new Object[] { "hostname", "h" + doc.host }, new Object[] { "metric", doc.metric });
+                }
+                return docs.size();
             }
-            return docs.size();
-        });
+        );
         DriverContext driverContext = driverContext();
         List<Page> results = new ArrayList<>();
         var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG);
@@ -192,16 +190,16 @@ public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
                 assertThat(page.getPositionCount(), lessThanOrEqualTo(limit));
                 assertThat(page.getPositionCount(), lessThanOrEqualTo(maxPageSize));
             }
-            assertThat(page.getBlockCount(), equalTo(4));
+            assertThat(page.getBlockCount(), equalTo(5));
             DocVector docVector = (DocVector) page.getBlock(0).asVector();
-            IntVector tsidVector = (IntVector) page.getBlock(1).asVector();
+            BytesRefVector tsidVector = (BytesRefVector) page.getBlock(1).asVector();
             LongVector timestampVector = (LongVector) page.getBlock(2).asVector();
-            LongVector metricVector = (LongVector) page.getBlock(3).asVector();
+            LongVector metricVector = (LongVector) page.getBlock(4).asVector();
             for (int i = 0; i < page.getPositionCount(); i++) {
                 Doc doc = docs.get(offset);
                 offset++;
                 assertThat(docVector.shards().getInt(0), equalTo(0));
-                assertThat(tsidVector.getInt(i), equalTo(hostToTsidOrd.get(doc.host)));
+                assertThat(tsidVector.getBytesRef(i, new BytesRef()).utf8ToString(), equalTo("\u0001\bhostnames\u0002h" + doc.host));
                 assertThat(timestampVector.getLong(i), equalTo(doc.timestamp));
                 assertThat(metricVector.getLong(i), equalTo(doc.metric));
             }
@@ -209,169 +207,9 @@ public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
         assertThat(offset, equalTo(Math.min(limit, numDocs)));
     }
 
-    public void testBasicRate() {
-        long[] v1 = { 1, 1, 3, 0, 2, 9, 21, 3, 7, 7, 9, 12 };
-        long[] t1 = { 1, 5, 11, 20, 21, 59, 88, 91, 92, 97, 99, 112 };
-
-        long[] v2 = { 7, 2, 0, 11, 24, 0, 4, 1, 10, 2 };
-        long[] t2 = { 1, 2, 4, 5, 6, 8, 10, 11, 12, 14 };
-
-        long[] v3 = { 0, 1, 0, 1, 1, 4, 2, 2, 2, 2, 3, 5, 5 };
-        long[] t3 = { 2, 3, 5, 7, 8, 9, 10, 12, 14, 15, 18, 20, 22 };
-        List<Pod> pods = List.of(new Pod("p1", t1, v1), new Pod("p2", t2, v2), new Pod("p3", t3, v3));
-        long unit = between(1, 5);
-        Map<String, Double> actualRates = runRateTest(pods, TimeValue.timeValueMillis(unit));
-        assertThat(actualRates, equalTo(Map.of("p1", 35.0 * unit / 111.0, "p2", 42.0 * unit / 13.0, "p3", 10.0 * unit / 20.0)));
-    }
-
-    public void testRandomRate() {
-        int numPods = between(1, 10);
-        List<Pod> pods = new ArrayList<>();
-        Map<String, Double> expectedRates = new HashMap<>();
-        TimeValue unit = TimeValue.timeValueSeconds(1);
-        for (int p = 0; p < numPods; p++) {
-            int numValues = between(2, 100);
-            long[] values = new long[numValues];
-            long[] times = new long[numValues];
-            long t = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
-            for (int i = 0; i < numValues; i++) {
-                values[i] = randomIntBetween(0, 100);
-                t += TimeValue.timeValueSeconds(between(1, 10)).millis();
-                times[i] = t;
-            }
-            Pod pod = new Pod("p" + p, times, values);
-            pods.add(pod);
-            if (numValues == 1) {
-                expectedRates.put(pod.name, null);
-            } else {
-                expectedRates.put(pod.name, pod.expectedRate(unit));
-            }
-        }
-        Map<String, Double> actualRates = runRateTest(pods, unit);
-        assertThat(actualRates, equalTo(expectedRates));
-    }
-
-    record Pod(String name, long[] times, long[] values) {
-        Pod {
-            assert times.length == values.length : times.length + "!=" + values.length;
-        }
-
-        double expectedRate(TimeValue unit) {
-            double dv = 0;
-            for (int i = 0; i < values.length - 1; i++) {
-                if (values[i + 1] < values[i]) {
-                    dv += values[i];
-                }
-            }
-            dv += (values[values.length - 1] - values[0]);
-            long dt = times[times.length - 1] - times[0];
-            return (dv * unit.millis()) / dt;
-        }
-    }
-
-    Map<String, Double> runRateTest(List<Pod> pods, TimeValue unit) {
-        long unitInMillis = unit.millis();
-        record Doc(String pod, long timestamp, long requests) {
-
-        }
-        var sourceOperatorFactory = createTimeSeriesSourceOperator(Integer.MAX_VALUE, between(1, 100), randomBoolean(), writer -> {
-            List<Doc> docs = new ArrayList<>();
-            for (Pod pod : pods) {
-                for (int i = 0; i < pod.times.length; i++) {
-                    docs.add(new Doc(pod.name, pod.times[i], pod.values[i]));
-                }
-            }
-            Randomness.shuffle(docs);
-            for (Doc doc : docs) {
-                writeTS(writer, doc.timestamp, new Object[] { "pod", doc.pod }, new Object[] { "requests", doc.requests });
-            }
-            return docs.size();
-        });
-        var ctx = driverContext();
-        HashAggregationOperator finalHash = new HashAggregationOperator(
-            List.of(new RateLongAggregatorFunctionSupplier(List.of(1, 2, 3), unitInMillis).groupingAggregatorFactory(AggregatorMode.FINAL)),
-            () -> BlockHash.build(
-                List.of(new HashAggregationOperator.GroupSpec(0, ElementType.BYTES_REF)),
-                ctx.blockFactory(),
-                randomIntBetween(1, 1000),
-                randomBoolean()
-            ),
-            ctx
-        );
-        List<Page> results = new ArrayList<>();
-        var requestsField = new NumberFieldMapper.NumberFieldType("requests", NumberFieldMapper.NumberType.LONG);
-        var podField = new KeywordFieldMapper.KeywordFieldType("pod");
-        if (randomBoolean()) {
-            HashAggregationOperator initialHash = new HashAggregationOperator(
-                List.of(
-                    new RateLongAggregatorFunctionSupplier(List.of(4, 2), unitInMillis).groupingAggregatorFactory(AggregatorMode.INITIAL)
-                ),
-                () -> BlockHash.build(
-                    List.of(new HashAggregationOperator.GroupSpec(3, ElementType.BYTES_REF)),
-                    ctx.blockFactory(),
-                    randomIntBetween(1, 1000),
-                    randomBoolean()
-                ),
-                ctx
-            );
-            OperatorTestCase.runDriver(
-                new Driver(
-                    ctx,
-                    sourceOperatorFactory.get(ctx),
-                    List.of(
-                        ValuesSourceReaderOperatorTests.factory(reader, podField, ElementType.BYTES_REF).get(ctx),
-                        ValuesSourceReaderOperatorTests.factory(reader, requestsField, ElementType.LONG).get(ctx),
-                        initialHash,
-                        finalHash
-                    ),
-                    new TestResultPageSinkOperator(results::add),
-                    () -> {}
-                )
-            );
-        } else {
-            var blockLoader = new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader("pod");
-            var shardContext = new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE);
-            var ordinalGrouping = new OrdinalsGroupingOperator(
-                shardIdx -> blockLoader,
-                List.of(shardContext),
-                ElementType.BYTES_REF,
-                0,
-                "pod",
-                List.of(
-                    new RateLongAggregatorFunctionSupplier(List.of(3, 2), unitInMillis).groupingAggregatorFactory(AggregatorMode.INITIAL)
-                ),
-                randomIntBetween(1, 1000),
-                ctx
-            );
-            OperatorTestCase.runDriver(
-                new Driver(
-                    ctx,
-                    sourceOperatorFactory.get(ctx),
-                    List.of(
-                        ValuesSourceReaderOperatorTests.factory(reader, requestsField, ElementType.LONG).get(ctx),
-                        ordinalGrouping,
-                        finalHash
-                    ),
-                    new TestResultPageSinkOperator(results::add),
-                    () -> {}
-                )
-            );
-        }
-        Map<String, Double> rates = new HashMap<>();
-        for (Page result : results) {
-            BytesRefBlock keysBlock = result.getBlock(0);
-            DoubleBlock ratesBlock = result.getBlock(1);
-            for (int i = 0; i < result.getPositionCount(); i++) {
-                rates.put(keysBlock.getBytesRef(i, new BytesRef()).utf8ToString(), ratesBlock.getDouble(i));
-            }
-            result.releaseBlocks();
-        }
-        return rates;
-    }
-
     @Override
     protected Operator.OperatorFactory simple() {
-        return createTimeSeriesSourceOperator(1, 1, false, writer -> {
+        return createTimeSeriesSourceOperator(directory, r -> this.reader = r, 1, 1, false, TimeValue.ZERO, writer -> {
             long timestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
             writeTS(writer, timestamp, new Object[] { "hostname", "host-01" }, new Object[] { "voltage", 2 });
             return 1;
@@ -390,18 +228,26 @@ public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
 
     List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTimeSeries, int numSamplesPerTS, long timestampStart) {
         var ctx = driverContext();
-        var timeSeriesFactory = createTimeSeriesSourceOperator(limit, maxPageSize, forceMerge, writer -> {
-            long timestamp = timestampStart;
-            for (int i = 0; i < numSamplesPerTS; i++) {
-                for (int j = 0; j < numTimeSeries; j++) {
-                    String hostname = String.format(Locale.ROOT, "host-%02d", j);
-                    writeTS(writer, timestamp, new Object[] { "hostname", hostname }, new Object[] { "voltage", j + 5 });
+        var timeSeriesFactory = createTimeSeriesSourceOperator(
+            directory,
+            indexReader -> this.reader = indexReader,
+            limit,
+            maxPageSize,
+            forceMerge,
+            TimeValue.ZERO,
+            writer -> {
+                long timestamp = timestampStart;
+                for (int i = 0; i < numSamplesPerTS; i++) {
+                    for (int j = 0; j < numTimeSeries; j++) {
+                        String hostname = String.format(Locale.ROOT, "host-%02d", j);
+                        writeTS(writer, timestamp, new Object[] { "hostname", hostname }, new Object[] { "voltage", j + 5 });
+                    }
+                    timestamp += 10_000;
+                    writer.commit();
                 }
-                timestamp += 10_000;
-                writer.commit();
+                return numTimeSeries * numSamplesPerTS;
             }
-            return numTimeSeries * numSamplesPerTS;
-        });
+        );
 
         List<Page> results = new ArrayList<>();
         var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
@@ -426,16 +272,20 @@ public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
         return results;
     }
 
-    TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperator(
+    public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperator(
+        Directory directory,
+        Consumer<IndexReader> readerConsumer,
         int limit,
         int maxPageSize,
         boolean forceMerge,
+        TimeValue timeValue,
         CheckedFunction<RandomIndexWriter, Integer, IOException> indexingLogic
     ) {
         Sort sort = new Sort(
             new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING, false),
             new SortedNumericSortField(DataStreamTimestampFieldMapper.DEFAULT_PATH, SortField.Type.LONG, true)
         );
+        IndexReader reader;
         try (
             RandomIndexWriter writer = new RandomIndexWriter(
                 random(),
@@ -449,16 +299,17 @@ public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
                 writer.forceMerge(1);
             }
             reader = writer.getReader();
+            readerConsumer.accept(reader);
             assertThat(reader.numDocs(), equalTo(numDocs));
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
         var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
         Function<ShardContext, Query> queryFunction = c -> new MatchAllDocsQuery();
-        return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, List.of(ctx), queryFunction);
+        return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, timeValue, List.of(ctx), queryFunction);
     }
 
-    static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {
+    public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {
         final List<IndexableField> fields = new ArrayList<>();
         fields.add(new SortedNumericDocValuesField(DataStreamTimestampFieldMapper.DEFAULT_PATH, timestamp));
         fields.add(new LongPoint(DataStreamTimestampFieldMapper.DEFAULT_PATH, timestamp));

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java

@@ -133,7 +133,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         return factory(reader, mapperService.fieldType("long"), ElementType.LONG);
     }
 
-    static Operator.OperatorFactory factory(IndexReader reader, MappedFieldType ft, ElementType elementType) {
+    public static Operator.OperatorFactory factory(IndexReader reader, MappedFieldType ft, ElementType elementType) {
         return factory(reader, ft.name(), elementType, ft.blockLoader(null));
     }
 

+ 404 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java

@@ -0,0 +1,404 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.Randomness;
+import org.elasticsearch.compute.aggregation.AggregatorMode;
+import org.elasticsearch.compute.aggregation.RateLongAggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorTests;
+import org.elasticsearch.core.IOUtils;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.mapper.BlockDocValuesReader;
+import org.elasticsearch.index.mapper.KeywordFieldMapper;
+import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.index.mapper.SourceLoader;
+import org.junit.After;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorTests.createTimeSeriesSourceOperator;
+import static org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorTests.writeTS;
+import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
+import static org.elasticsearch.test.MapMatcher.assertMap;
+import static org.elasticsearch.test.MapMatcher.matchesMap;
+import static org.hamcrest.Matchers.equalTo;
+
+public class TimeSeriesAggregationOperatorTests extends AnyOperatorTestCase {
+
+    private IndexReader reader;
+    private final Directory directory = newDirectory();
+
+    @After
+    public void cleanup() throws IOException {
+        IOUtils.close(reader, directory);
+    }
+
+    @Override
+    protected Operator.OperatorFactory simple() {
+        return new TimeSeriesAggregationOperatorFactory(AggregatorMode.FINAL, 0, 1, TimeValue.ZERO, List.of(), 100);
+    }
+
+    @Override
+    protected String expectedDescriptionOfSimple() {
+        return "TimeSeriesAggregationOperator[mode=FINAL, tsHashChannel = 0, timestampIntervalChannel = 1, "
+            + "timeSeriesPeriod = 0s, maxPageSize = 100]";
+    }
+
+    @Override
+    protected String expectedToStringOfSimple() {
+        return "HashAggregationOperator[blockHash=TimeSeriesBlockHash{keys=[BytesRefKey[channel=0], "
+            + "LongKey[channel=1]], entries=-1b}, aggregators=[]]";
+    }
+
+    public void testBasicRate() {
+        long[] v1 = { 1, 1, 3, 0, 2, 9, 21, 3, 7, 7, 9, 12 };
+        long[] t1 = { 1, 5, 11, 20, 21, 59, 88, 91, 92, 97, 99, 112 };
+
+        long[] v2 = { 7, 2, 0, 11, 24, 0, 4, 1, 10, 2 };
+        long[] t2 = { 1, 2, 4, 5, 6, 8, 10, 11, 12, 14 };
+
+        long[] v3 = { 0, 1, 0, 1, 1, 4, 2, 2, 2, 2, 3, 5, 5 };
+        long[] t3 = { 2, 3, 5, 7, 8, 9, 10, 12, 14, 15, 18, 20, 22 };
+        List<Pod> pods = List.of(new Pod("p1", t1, v1), new Pod("p2", t2, v2), new Pod("p3", t3, v3));
+        long unit = between(1, 5);
+        Map<Group, Double> actualRates = runRateTest(pods, TimeValue.timeValueMillis(unit), TimeValue.ZERO);
+        assertThat(
+            actualRates,
+            equalTo(
+                Map.of(
+                    new Group("\u0001\u0003pods\u0002p1", 0),
+                    35.0 * unit / 111.0,
+                    new Group("\u0001\u0003pods\u0002p2", 0),
+                    42.0 * unit / 13.0,
+                    new Group("\u0001\u0003pods\u0002p3", 0),
+                    10.0 * unit / 20.0
+                )
+            )
+        );
+    }
+
+    public void testRateWithInterval() {
+        long[] v1 = { 1, 2, 3, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3 };
+        long[] t1 = { 0, 10_000, 20_000, 30_000, 40_000, 50_000, 60_000, 70_000, 80_000, 90_000, 100_000, 110_000, 120_000 };
+
+        long[] v2 = { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 };
+        long[] t2 = { 0, 10_000, 20_000, 30_000, 40_000, 50_000, 60_000, 70_000, 80_000, 90_000, 100_000, 110_000, 120_000 };
+
+        long[] v3 = { 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192 };
+        long[] t3 = { 0, 10_000, 20_000, 30_000, 40_000, 50_000, 60_000, 70_000, 80_000, 90_000, 100_000, 110_000, 120_000 };
+        List<Pod> pods = List.of(new Pod("p1", t1, v1), new Pod("p2", t2, v2), new Pod("p3", t3, v3));
+        Map<Group, Double> actualRates = runRateTest(pods, TimeValue.timeValueMillis(1), TimeValue.timeValueMinutes(1));
+        assertMap(
+            actualRates,
+            matchesMap().entry(new Group("\u0001\u0003pods\u0002p1", 120_000), 0.0D)
+                .entry(new Group("\u0001\u0003pods\u0002p1", 60_000), 8.0E-5D)
+                .entry(new Group("\u0001\u0003pods\u0002p1", 0), 8.0E-5D)
+                .entry(new Group("\u0001\u0003pods\u0002p2", 120_000), 0.0D)
+                .entry(new Group("\u0001\u0003pods\u0002p2", 60_000), 0.0D)
+                .entry(new Group("\u0001\u0003pods\u0002p2", 0), 0.0D)
+                .entry(new Group("\u0001\u0003pods\u0002p3", 120_000), 0.0D)
+                .entry(new Group("\u0001\u0003pods\u0002p3", 60_000), 0.07936D)
+                .entry(new Group("\u0001\u0003pods\u0002p3", 0), 0.00124D)
+        );
+    }
+
+    public void testRandomRate() {
+        int numPods = between(1, 10);
+        List<Pod> pods = new ArrayList<>();
+        Map<Group, Double> expectedRates = new HashMap<>();
+        TimeValue unit = TimeValue.timeValueSeconds(1);
+        for (int p = 0; p < numPods; p++) {
+            int numValues = between(2, 100);
+            long[] values = new long[numValues];
+            long[] times = new long[numValues];
+            long t = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
+            for (int i = 0; i < numValues; i++) {
+                values[i] = randomIntBetween(0, 100);
+                t += TimeValue.timeValueSeconds(between(1, 10)).millis();
+                times[i] = t;
+            }
+            Pod pod = new Pod("p" + p, times, values);
+            pods.add(pod);
+            if (numValues == 1) {
+                expectedRates.put(new Group("\u0001\u0003pods\u0002" + pod.name, 0), null);
+            } else {
+                expectedRates.put(new Group("\u0001\u0003pods\u0002" + pod.name, 0), pod.expectedRate(unit));
+            }
+        }
+        Map<Group, Double> actualRates = runRateTest(pods, unit, TimeValue.ZERO);
+        assertThat(actualRates, equalTo(expectedRates));
+    }
+
+    record Pod(String name, long[] times, long[] values) {
+        Pod {
+            assert times.length == values.length : times.length + "!=" + values.length;
+        }
+
+        double expectedRate(TimeValue unit) {
+            double dv = 0;
+            for (int i = 0; i < values.length - 1; i++) {
+                if (values[i + 1] < values[i]) {
+                    dv += values[i];
+                }
+            }
+            dv += (values[values.length - 1] - values[0]);
+            long dt = times[times.length - 1] - times[0];
+            return (dv * unit.millis()) / dt;
+        }
+    }
+
+    Map<Group, Double> runRateTest(List<Pod> pods, TimeValue unit, TimeValue interval) {
+        long unitInMillis = unit.millis();
+        record Doc(String pod, long timestamp, long requests) {
+
+        }
+        var sourceOperatorFactory = createTimeSeriesSourceOperator(
+            directory,
+            r -> this.reader = r,
+            Integer.MAX_VALUE,
+            between(1, 100),
+            randomBoolean(),
+            interval,
+            writer -> {
+                List<Doc> docs = new ArrayList<>();
+                for (Pod pod : pods) {
+                    for (int i = 0; i < pod.times.length; i++) {
+                        docs.add(new Doc(pod.name, pod.times[i], pod.values[i]));
+                    }
+                }
+                Randomness.shuffle(docs);
+                for (Doc doc : docs) {
+                    writeTS(writer, doc.timestamp, new Object[] { "pod", doc.pod }, new Object[] { "requests", doc.requests });
+                }
+                return docs.size();
+            }
+        );
+        var ctx = driverContext();
+
+        var aggregators = List.of(
+            new RateLongAggregatorFunctionSupplier(List.of(4, 2), unitInMillis).groupingAggregatorFactory(AggregatorMode.INITIAL)
+        );
+        Operator initialHash = new TimeSeriesAggregationOperatorFactory(
+            AggregatorMode.INITIAL,
+            1,
+            3,
+            interval,
+            aggregators,
+            randomIntBetween(1, 1000)
+        ).get(ctx);
+
+        aggregators = List.of(
+            new RateLongAggregatorFunctionSupplier(List.of(2, 3, 4), unitInMillis).groupingAggregatorFactory(AggregatorMode.FINAL)
+        );
+        Operator finalHash = new TimeSeriesAggregationOperatorFactory(
+            AggregatorMode.FINAL,
+            0,
+            1,
+            interval,
+            aggregators,
+            randomIntBetween(1, 1000)
+        ).get(ctx);
+        List<Page> results = new ArrayList<>();
+        var requestsField = new NumberFieldMapper.NumberFieldType("requests", NumberFieldMapper.NumberType.LONG);
+        OperatorTestCase.runDriver(
+            new Driver(
+                ctx,
+                sourceOperatorFactory.get(ctx),
+                List.of(ValuesSourceReaderOperatorTests.factory(reader, requestsField, ElementType.LONG).get(ctx), initialHash, finalHash),
+                new TestResultPageSinkOperator(results::add),
+                () -> {}
+            )
+        );
+        Map<Group, Double> rates = new HashMap<>();
+        for (Page result : results) {
+            BytesRefBlock keysBlock = result.getBlock(0);
+            LongBlock timestampIntervalsBock = result.getBlock(1);
+            DoubleBlock ratesBlock = result.getBlock(2);
+            for (int i = 0; i < result.getPositionCount(); i++) {
+                var key = new Group(keysBlock.getBytesRef(i, new BytesRef()).utf8ToString(), timestampIntervalsBock.getLong(i));
+                rates.put(key, ratesBlock.getDouble(i));
+            }
+            result.releaseBlocks();
+        }
+        return rates;
+    }
+
+    record Group(String tsidHash, long timestampInterval) {}
+
+    // TODO: in a follow up add support for ordinal based time series grouping operator
+    // (and then remove this test)
+    // (ordinal based can only group by one field and never includes timestamp)
+    public void testBasicRateOrdinalBased() {
+        long[] v1 = { 1, 1, 3, 0, 2, 9, 21, 3, 7, 7, 9, 12 };
+        long[] t1 = { 1, 5, 11, 20, 21, 59, 88, 91, 92, 97, 99, 112 };
+
+        long[] v2 = { 7, 2, 0, 11, 24, 0, 4, 1, 10, 2 };
+        long[] t2 = { 1, 2, 4, 5, 6, 8, 10, 11, 12, 14 };
+
+        long[] v3 = { 0, 1, 0, 1, 1, 4, 2, 2, 2, 2, 3, 5, 5 };
+        long[] t3 = { 2, 3, 5, 7, 8, 9, 10, 12, 14, 15, 18, 20, 22 };
+        List<Pod> pods = List.of(new Pod("p1", t1, v1), new Pod("p2", t2, v2), new Pod("p3", t3, v3));
+        long unit = between(1, 5);
+        Map<String, Double> actualRates = runRateTestOrdinalBased(pods, TimeValue.timeValueMillis(unit));
+        assertThat(actualRates, equalTo(Map.of("p1", 35.0 * unit / 111.0, "p2", 42.0 * unit / 13.0, "p3", 10.0 * unit / 20.0)));
+    }
+
+    // TODO: in a follow up add support for ordinal based time series grouping operator
+    // (and then remove this test)
+    // (ordinal based can only group by one field and never includes timestamp)
+    public void testRandomRateOrdinalBased() {
+        int numPods = between(1, 10);
+        List<Pod> pods = new ArrayList<>();
+        Map<String, Double> expectedRates = new HashMap<>();
+        TimeValue unit = TimeValue.timeValueSeconds(1);
+        for (int p = 0; p < numPods; p++) {
+            int numValues = between(2, 100);
+            long[] values = new long[numValues];
+            long[] times = new long[numValues];
+            long t = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
+            for (int i = 0; i < numValues; i++) {
+                values[i] = randomIntBetween(0, 100);
+                t += TimeValue.timeValueSeconds(between(1, 10)).millis();
+                times[i] = t;
+            }
+            Pod pod = new Pod("p" + p, times, values);
+            pods.add(pod);
+            if (numValues == 1) {
+                expectedRates.put(pod.name, null);
+            } else {
+                expectedRates.put(pod.name, pod.expectedRate(unit));
+            }
+        }
+        Map<String, Double> actualRates = runRateTestOrdinalBased(pods, unit);
+        assertThat(actualRates, equalTo(expectedRates));
+    }
+
+    Map<String, Double> runRateTestOrdinalBased(List<Pod> pods, TimeValue unit) {
+        long unitInMillis = unit.millis();
+        record Doc(String pod, long timestamp, long requests) {
+
+        }
+        var sourceOperatorFactory = createTimeSeriesSourceOperator(
+            directory,
+            r -> this.reader = r,
+            Integer.MAX_VALUE,
+            between(1, 100),
+            randomBoolean(),
+            TimeValue.ZERO,
+            writer -> {
+                List<Doc> docs = new ArrayList<>();
+                for (Pod pod : pods) {
+                    for (int i = 0; i < pod.times.length; i++) {
+                        docs.add(new Doc(pod.name, pod.times[i], pod.values[i]));
+                    }
+                }
+                Randomness.shuffle(docs);
+                for (Doc doc : docs) {
+                    writeTS(writer, doc.timestamp, new Object[] { "pod", doc.pod }, new Object[] { "requests", doc.requests });
+                }
+                return docs.size();
+            }
+        );
+        var ctx = driverContext();
+        HashAggregationOperator finalHash = new HashAggregationOperator(
+            List.of(new RateLongAggregatorFunctionSupplier(List.of(1, 2, 3), unitInMillis).groupingAggregatorFactory(AggregatorMode.FINAL)),
+            () -> BlockHash.build(
+                List.of(new HashAggregationOperator.GroupSpec(0, ElementType.BYTES_REF)),
+                ctx.blockFactory(),
+                randomIntBetween(1, 1000),
+                randomBoolean()
+            ),
+            ctx
+        );
+        List<Page> results = new ArrayList<>();
+        var requestsField = new NumberFieldMapper.NumberFieldType("requests", NumberFieldMapper.NumberType.LONG);
+        var podField = new KeywordFieldMapper.KeywordFieldType("pod");
+        if (randomBoolean()) {
+            HashAggregationOperator initialHash = new HashAggregationOperator(
+                List.of(
+                    new RateLongAggregatorFunctionSupplier(List.of(5, 2), unitInMillis).groupingAggregatorFactory(AggregatorMode.INITIAL)
+                ),
+                () -> BlockHash.build(
+                    List.of(new HashAggregationOperator.GroupSpec(4, ElementType.BYTES_REF)),
+                    ctx.blockFactory(),
+                    randomIntBetween(1, 1000),
+                    randomBoolean()
+                ),
+                ctx
+            );
+            OperatorTestCase.runDriver(
+                new Driver(
+                    ctx,
+                    sourceOperatorFactory.get(ctx),
+                    List.of(
+                        ValuesSourceReaderOperatorTests.factory(reader, podField, ElementType.BYTES_REF).get(ctx),
+                        ValuesSourceReaderOperatorTests.factory(reader, requestsField, ElementType.LONG).get(ctx),
+                        initialHash,
+                        finalHash
+                    ),
+                    new TestResultPageSinkOperator(results::add),
+                    () -> {}
+                )
+            );
+        } else {
+            var blockLoader = new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader("pod");
+            var shardContext = new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE);
+            var ordinalGrouping = new OrdinalsGroupingOperator(
+                shardIdx -> blockLoader,
+                List.of(shardContext),
+                ElementType.BYTES_REF,
+                0,
+                "pod",
+                List.of(
+                    new RateLongAggregatorFunctionSupplier(List.of(4, 2), unitInMillis).groupingAggregatorFactory(AggregatorMode.INITIAL)
+                ),
+                randomIntBetween(1, 1000),
+                ctx
+            );
+            OperatorTestCase.runDriver(
+                new Driver(
+                    ctx,
+                    sourceOperatorFactory.get(ctx),
+                    List.of(
+                        ValuesSourceReaderOperatorTests.factory(reader, requestsField, ElementType.LONG).get(ctx),
+                        ordinalGrouping,
+                        finalHash
+                    ),
+                    new TestResultPageSinkOperator(results::add),
+                    () -> {}
+                )
+            );
+        }
+        Map<String, Double> rates = new HashMap<>();
+        for (Page result : results) {
+            BytesRefBlock keysBlock = result.getBlock(0);
+            DoubleBlock ratesBlock = result.getBlock(1);
+            for (int i = 0; i < result.getPositionCount(); i++) {
+                var key = keysBlock.getBytesRef(i, new BytesRef()).utf8ToString();
+                rates.put(key, ratesBlock.getDouble(i));
+            }
+            result.releaseBlocks();
+        }
+        return rates;
+    }
+
+}

+ 3 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsTimeseriesQueryExec.java

@@ -25,6 +25,7 @@ public class EsTimeseriesQueryExec extends EsQueryExec {
 
     static final EsField TSID_FIELD = new EsField("_tsid", DataTypes.KEYWORD, Map.of(), true);
     static final EsField TIMESTAMP_FIELD = new EsField("@timestamp", DataTypes.DATETIME, Map.of(), true);
+    static final EsField INTERVAL_FIELD = new EsField("timestamp_interval", DataTypes.DATETIME, Map.of(), true);
 
     public EsTimeseriesQueryExec(Source source, EsIndex index, QueryBuilder query) {
         this(
@@ -33,7 +34,8 @@ public class EsTimeseriesQueryExec extends EsQueryExec {
             List.of(
                 new FieldAttribute(source, DOC_ID_FIELD.getName(), DOC_ID_FIELD),
                 new FieldAttribute(source, TSID_FIELD.getName(), TSID_FIELD),
-                new FieldAttribute(source, TIMESTAMP_FIELD.getName(), TSID_FIELD)
+                new FieldAttribute(source, TIMESTAMP_FIELD.getName(), TIMESTAMP_FIELD),
+                new FieldAttribute(source, INTERVAL_FIELD.getName(), INTERVAL_FIELD)
             ),
             query,
             null,

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -24,6 +24,7 @@ import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
 import org.elasticsearch.compute.operator.SourceOperator;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.mapper.BlockLoader;
 import org.elasticsearch.index.mapper.FieldNamesFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
@@ -145,6 +146,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
                     limit,
                     context.pageSize(rowEstimatedSize),
                     context.queryPragmas().taskConcurrency(),
+                    TimeValue.ZERO,
                     shardContexts,
                     querySupplier(esQueryExec.query())
                 );