소스 검색

Remove time-series support in ES|QL in 8.19 (#131980)

Time-series is not usable in 8.19; however, its existence complicates 
backporting. This change removes the time-series code in 8.19.
Nhat Nguyen 2 달 전
부모
커밋
b991b5466d

+ 0 - 362
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java

@@ -1,362 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.compute.lucene;
-
-import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.index.SortedDocValues;
-import org.apache.lucene.index.SortedNumericDocValues;
-import org.apache.lucene.search.DocIdSetIterator;
-import org.apache.lucene.search.ScoreMode;
-import org.apache.lucene.search.Scorer;
-import org.apache.lucene.search.Weight;
-import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.PriorityQueue;
-import org.elasticsearch.compute.data.BlockFactory;
-import org.elasticsearch.compute.data.BytesRefVector;
-import org.elasticsearch.compute.data.DocVector;
-import org.elasticsearch.compute.data.IntBlock;
-import org.elasticsearch.compute.data.IntVector;
-import org.elasticsearch.compute.data.LongVector;
-import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.operator.DriverContext;
-import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.core.Releasables;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.List;
-import java.util.function.Function;
-
-/**
- * Creates a source operator that takes advantage of the natural sorting of segments in a tsdb index.
- * <p>
- * This source operator loads the _tsid and @timestamp fields, which is used for emitting documents in the correct order. These field values
- * are included in the page as seperate blocks and downstream operators can make use of these loaded time series ids and timestamps.
- * <p>
- * The source operator includes all documents of a time serie with the same page. So the same time series never exists in multiple pages.
- * Downstream operators can make use of this implementation detail.
- * <p>
- * This operator currently only supports shard level concurrency. A new concurrency mechanism should be introduced at the time serie level
- * in order to read tsdb indices in parallel.
- */
-public class TimeSeriesSortedSourceOperatorFactory extends LuceneOperator.Factory {
-
-    private final int maxPageSize;
-
-    private TimeSeriesSortedSourceOperatorFactory(
-        List<? extends ShardContext> contexts,
-        Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
-        int taskConcurrency,
-        int maxPageSize,
-        int limit
-    ) {
-        super(
-            contexts,
-            queryFunction,
-            DataPartitioning.SHARD,
-            query -> { throw new UnsupportedOperationException("locked to SHARD partitioning"); },
-            taskConcurrency,
-            limit,
-            false,
-            shardContext -> ScoreMode.COMPLETE_NO_SCORES
-        );
-        this.maxPageSize = maxPageSize;
-    }
-
-    @Override
-    public SourceOperator get(DriverContext driverContext) {
-        return new Impl(driverContext.blockFactory(), sliceQueue, maxPageSize, limit);
-    }
-
-    @Override
-    public String describe() {
-        return "TimeSeriesSortedSourceOperator[maxPageSize = " + maxPageSize + ", limit = " + limit + "]";
-    }
-
-    public static TimeSeriesSortedSourceOperatorFactory create(
-        int limit,
-        int maxPageSize,
-        int taskConcurrency,
-        List<? extends ShardContext> searchContexts,
-        Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction
-    ) {
-        return new TimeSeriesSortedSourceOperatorFactory(searchContexts, queryFunction, taskConcurrency, maxPageSize, limit);
-    }
-
-    static final class Impl extends SourceOperator {
-
-        private final int maxPageSize;
-        private final BlockFactory blockFactory;
-        private final LuceneSliceQueue sliceQueue;
-        private int currentPagePos = 0;
-        private int remainingDocs;
-        private boolean doneCollecting;
-        private IntVector.Builder docsBuilder;
-        private IntVector.Builder segmentsBuilder;
-        private LongVector.Builder timestampsBuilder;
-        // TODO: add an ordinal block for tsid hashes
-        // (This allows for efficiently grouping by tsid locally, no need to use bytes representation of tsid hash)
-        private BytesRefVector.Builder tsHashesBuilder;
-        private TimeSeriesIterator iterator;
-
-        Impl(BlockFactory blockFactory, LuceneSliceQueue sliceQueue, int maxPageSize, int limit) {
-            this.maxPageSize = maxPageSize;
-            this.blockFactory = blockFactory;
-            this.remainingDocs = limit;
-            this.docsBuilder = blockFactory.newIntVectorBuilder(Math.min(limit, maxPageSize));
-            this.segmentsBuilder = null;
-            this.timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(limit, maxPageSize));
-            this.tsHashesBuilder = blockFactory.newBytesRefVectorBuilder(Math.min(limit, maxPageSize));
-            this.sliceQueue = sliceQueue;
-        }
-
-        @Override
-        public void finish() {
-            this.doneCollecting = true;
-        }
-
-        @Override
-        public boolean isFinished() {
-            return doneCollecting;
-        }
-
-        @Override
-        public Page getOutput() {
-            if (isFinished()) {
-                return null;
-            }
-
-            if (remainingDocs <= 0) {
-                doneCollecting = true;
-                return null;
-            }
-
-            Page page = null;
-            IntBlock shard = null;
-            IntVector leaf = null;
-            IntVector docs = null;
-            LongVector timestamps = null;
-            BytesRefVector tsids = null;
-            try {
-                if (iterator == null) {
-                    var slice = sliceQueue.nextSlice();
-                    if (slice == null) {
-                        doneCollecting = true;
-                        return null;
-                    }
-                    if (segmentsBuilder == null && slice.numLeaves() > 1) {
-                        segmentsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
-                    }
-                    iterator = new TimeSeriesIterator(slice);
-                }
-                iterator.consume();
-                shard = blockFactory.newConstantIntBlockWith(iterator.slice.shardContext().index(), currentPagePos);
-                if (iterator.slice.numLeaves() == 1) {
-                    int segmentOrd = iterator.slice.getLeaf(0).leafReaderContext().ord;
-                    leaf = blockFactory.newConstantIntBlockWith(segmentOrd, currentPagePos).asVector();
-                } else {
-                    // Due to the multi segment nature of time series source operator singleSegmentNonDecreasing must be false
-                    leaf = segmentsBuilder.build();
-                    segmentsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
-                }
-                docs = docsBuilder.build();
-                docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
-
-                timestamps = timestampsBuilder.build();
-                timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize));
-                tsids = tsHashesBuilder.build();
-                tsHashesBuilder = blockFactory.newBytesRefVectorBuilder(Math.min(remainingDocs, maxPageSize));
-                page = new Page(
-                    currentPagePos,
-                    new DocVector(shard.asVector(), leaf, docs, leaf.isConstant()).asBlock(),
-                    tsids.asBlock(),
-                    timestamps.asBlock()
-                );
-
-                currentPagePos = 0;
-                if (iterator.completed()) {
-                    iterator = null;
-                }
-            } catch (IOException e) {
-                throw new UncheckedIOException(e);
-            } finally {
-                if (page == null) {
-                    Releasables.closeExpectNoException(shard, leaf, docs, timestamps, tsids);
-                }
-            }
-            return page;
-        }
-
-        @Override
-        public void close() {
-            Releasables.closeExpectNoException(docsBuilder, segmentsBuilder, timestampsBuilder, tsHashesBuilder);
-        }
-
-        class TimeSeriesIterator {
-
-            final LuceneSlice slice;
-            final Leaf leaf;
-            final PriorityQueue<Leaf> queue;
-            int globalTsidOrd;
-            BytesRef currentTsid = new BytesRef();
-
-            TimeSeriesIterator(LuceneSlice slice) throws IOException {
-                this.slice = slice;
-                Weight weight = slice.weight();
-                if (slice.numLeaves() == 1) {
-                    queue = null;
-                    leaf = new Leaf(weight, slice.getLeaf(0).leafReaderContext());
-                } else {
-                    queue = new PriorityQueue<>(slice.numLeaves()) {
-                        @Override
-                        protected boolean lessThan(Leaf a, Leaf b) {
-                            // tsid hash in ascending order:
-                            int cmp = a.timeSeriesHash.compareTo(b.timeSeriesHash);
-                            if (cmp == 0) {
-                                // timestamp in descending order:
-                                cmp = -Long.compare(a.timestamp, b.timestamp);
-                            }
-                            return cmp < 0;
-                        }
-                    };
-                    leaf = null;
-                    for (var leafReaderContext : slice.leaves()) {
-                        Leaf leaf = new Leaf(weight, leafReaderContext.leafReaderContext());
-                        if (leaf.nextDoc()) {
-                            queue.add(leaf);
-                        }
-                    }
-                }
-            }
-
-            void consume() throws IOException {
-                if (queue != null) {
-                    if (queue.size() > 0) {
-                        currentTsid = BytesRef.deepCopyOf(queue.top().timeSeriesHash);
-                        queue.top().reinitializeIfNeeded(Thread.currentThread());
-                    }
-                    while (queue.size() > 0) {
-                        if (remainingDocs <= 0 || currentPagePos >= maxPageSize) {
-                            break;
-                        }
-                        currentPagePos++;
-                        remainingDocs--;
-                        Leaf leaf = queue.top();
-                        segmentsBuilder.appendInt(leaf.segmentOrd);
-                        docsBuilder.appendInt(leaf.iterator.docID());
-                        timestampsBuilder.appendLong(leaf.timestamp);
-                        tsHashesBuilder.appendBytesRef(currentTsid);
-                        final Leaf newTop;
-                        if (leaf.nextDoc()) {
-                            // TODO: updating the top is one of the most expensive parts of this operation.
-                            // Ideally we would do this a less as possible. Maybe the top can be updated every N docs?
-                            newTop = queue.updateTop();
-                        } else {
-                            queue.pop();
-                            newTop = queue.size() > 0 ? queue.top() : null;
-                        }
-                        if (newTop != null) {
-                            if (newTop != leaf) {
-                                newTop.reinitializeIfNeeded(Thread.currentThread());
-                            }
-                            if (newTop.timeSeriesHash.equals(currentTsid) == false) {
-                                globalTsidOrd++;
-                                currentTsid = BytesRef.deepCopyOf(newTop.timeSeriesHash);
-                            }
-                        }
-                    }
-                } else {
-                    // Only one segment, so no need to use priority queue and use segment ordinals as tsid ord.
-                    leaf.reinitializeIfNeeded(Thread.currentThread());
-                    while (leaf.nextDoc()) {
-                        tsHashesBuilder.appendBytesRef(leaf.timeSeriesHash);
-                        timestampsBuilder.appendLong(leaf.timestamp);
-                        // Don't append segment ord, because there is only one segment.
-                        docsBuilder.appendInt(leaf.iterator.docID());
-                        currentPagePos++;
-                        remainingDocs--;
-                        if (remainingDocs <= 0 || currentPagePos >= maxPageSize) {
-                            break;
-                        }
-                    }
-                }
-            }
-
-            boolean completed() {
-                if (queue != null) {
-                    return iterator.queue.size() == 0;
-                } else {
-                    return leaf.iterator.docID() == DocIdSetIterator.NO_MORE_DOCS;
-                }
-            }
-
-            static class Leaf {
-
-                private final int segmentOrd;
-                private final Weight weight;
-                private final LeafReaderContext leaf;
-                private SortedDocValues tsids;
-                private SortedNumericDocValues timestamps;
-                private DocIdSetIterator iterator;
-                private Thread createdThread;
-
-                private long timestamp;
-                private int timeSeriesHashOrd;
-                private BytesRef timeSeriesHash;
-                private int docID = -1;
-
-                Leaf(Weight weight, LeafReaderContext leaf) throws IOException {
-                    this.segmentOrd = leaf.ord;
-                    this.weight = weight;
-                    this.leaf = leaf;
-                    this.createdThread = Thread.currentThread();
-                    tsids = leaf.reader().getSortedDocValues("_tsid");
-                    timestamps = leaf.reader().getSortedNumericDocValues("@timestamp");
-                    final Scorer scorer = weight.scorer(leaf);
-                    iterator = scorer != null ? scorer.iterator() : DocIdSetIterator.empty();
-                }
-
-                boolean nextDoc() throws IOException {
-                    docID = iterator.nextDoc();
-                    if (docID == DocIdSetIterator.NO_MORE_DOCS) {
-                        return false;
-                    }
-
-                    boolean advanced = tsids.advanceExact(docID);
-                    assert advanced;
-                    timeSeriesHashOrd = tsids.ordValue();
-                    timeSeriesHash = tsids.lookupOrd(timeSeriesHashOrd);
-                    advanced = timestamps.advanceExact(docID);
-                    assert advanced;
-                    timestamp = timestamps.nextValue();
-                    return true;
-                }
-
-                void reinitializeIfNeeded(Thread executingThread) throws IOException {
-                    if (executingThread != createdThread) {
-                        tsids = leaf.reader().getSortedDocValues("_tsid");
-                        timestamps = leaf.reader().getSortedNumericDocValues("@timestamp");
-                        final Scorer scorer = weight.scorer(leaf);
-                        iterator = scorer != null ? scorer.iterator() : DocIdSetIterator.empty();
-                        if (docID != -1) {
-                            iterator.advance(docID);
-                        }
-                        createdThread = executingThread;
-                    }
-                }
-            }
-
-        }
-
-        @Override
-        public String toString() {
-            return this.getClass().getSimpleName() + "[" + "maxPageSize=" + maxPageSize + ", remainingDocs=" + remainingDocs + "]";
-        }
-
-    }
-}

+ 0 - 397
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java

@@ -1,397 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.compute.lucene;
-
-import org.apache.lucene.document.DoubleDocValuesField;
-import org.apache.lucene.document.FloatDocValuesField;
-import org.apache.lucene.document.LongField;
-import org.apache.lucene.document.LongPoint;
-import org.apache.lucene.document.NumericDocValuesField;
-import org.apache.lucene.document.SortedDocValuesField;
-import org.apache.lucene.document.SortedNumericDocValuesField;
-import org.apache.lucene.document.SortedSetDocValuesField;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.index.NoMergePolicy;
-import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.search.MatchNoDocsQuery;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.SortField;
-import org.apache.lucene.search.SortedNumericSortField;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.tests.index.RandomIndexWriter;
-import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.common.Randomness;
-import org.elasticsearch.compute.data.BytesRefVector;
-import org.elasticsearch.compute.data.DocVector;
-import org.elasticsearch.compute.data.ElementType;
-import org.elasticsearch.compute.data.LongVector;
-import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests;
-import org.elasticsearch.compute.operator.Driver;
-import org.elasticsearch.compute.operator.DriverContext;
-import org.elasticsearch.compute.operator.Operator;
-import org.elasticsearch.compute.test.AnyOperatorTestCase;
-import org.elasticsearch.compute.test.OperatorTestCase;
-import org.elasticsearch.compute.test.TestResultPageSinkOperator;
-import org.elasticsearch.core.CheckedFunction;
-import org.elasticsearch.core.IOUtils;
-import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
-import org.elasticsearch.index.mapper.DateFieldMapper;
-import org.elasticsearch.index.mapper.KeywordFieldMapper;
-import org.elasticsearch.index.mapper.NumberFieldMapper;
-import org.elasticsearch.index.mapper.RoutingPathFields;
-import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
-import org.hamcrest.Matcher;
-import org.junit.After;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-
-public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
-
-    private IndexReader reader;
-    private final Directory directory = newDirectory();
-
-    @After
-    public void cleanup() throws IOException {
-        IOUtils.close(reader, directory);
-    }
-
-    public void testSimple() {
-        int numTimeSeries = 3;
-        int numSamplesPerTS = 10;
-        long timestampStart = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
-        int maxPageSize = between(1, 1024);
-        List<Page> results = runDriver(1024, maxPageSize, randomBoolean(), numTimeSeries, numSamplesPerTS, timestampStart);
-        // for now we emit at most one time series each page
-        int offset = 0;
-        for (Page page : results) {
-            assertThat(page.getBlockCount(), equalTo(5));
-            DocVector docVector = (DocVector) page.getBlock(0).asVector();
-            BytesRefVector tsidVector = (BytesRefVector) page.getBlock(1).asVector();
-            LongVector timestampVector = (LongVector) page.getBlock(2).asVector();
-            LongVector voltageVector = (LongVector) page.getBlock(3).asVector();
-            BytesRefVector hostnameVector = (BytesRefVector) page.getBlock(4).asVector();
-            for (int i = 0; i < page.getPositionCount(); i++) {
-                int expectedTsidOrd = offset / numSamplesPerTS;
-                String expectedHostname = String.format(Locale.ROOT, "host-%02d", expectedTsidOrd);
-                long expectedVoltage = 5L + expectedTsidOrd;
-                int sampleIndex = offset - expectedTsidOrd * numSamplesPerTS;
-                long expectedTimestamp = timestampStart + ((numSamplesPerTS - sampleIndex - 1) * 10_000L);
-                assertThat(docVector.shards().getInt(i), equalTo(0));
-                assertThat(voltageVector.getLong(i), equalTo(expectedVoltage));
-                assertThat(hostnameVector.getBytesRef(i, new BytesRef()).utf8ToString(), equalTo(expectedHostname));
-                assertThat(tsidVector.getBytesRef(i, new BytesRef()).utf8ToString(), equalTo("\u0001\bhostnames\u0007" + expectedHostname));
-                assertThat(timestampVector.getLong(i), equalTo(expectedTimestamp));
-                offset++;
-            }
-        }
-    }
-
-    public void testLimit() {
-        int numTimeSeries = 3;
-        int numSamplesPerTS = 10;
-        int limit = 1;
-        long timestampStart = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
-        List<Page> results = runDriver(limit, randomIntBetween(1, 1024), randomBoolean(), numTimeSeries, numSamplesPerTS, timestampStart);
-        assertThat(results, hasSize(1));
-        Page page = results.get(0);
-        assertThat(page.getBlockCount(), equalTo(5));
-
-        DocVector docVector = (DocVector) page.getBlock(0).asVector();
-        assertThat(docVector.getPositionCount(), equalTo(limit));
-
-        BytesRefVector tsidVector = (BytesRefVector) page.getBlock(1).asVector();
-        assertThat(tsidVector.getPositionCount(), equalTo(limit));
-
-        LongVector timestampVector = (LongVector) page.getBlock(2).asVector();
-        assertThat(timestampVector.getPositionCount(), equalTo(limit));
-
-        LongVector voltageVector = (LongVector) page.getBlock(3).asVector();
-        assertThat(voltageVector.getPositionCount(), equalTo(limit));
-
-        BytesRefVector hostnameVector = (BytesRefVector) page.getBlock(4).asVector();
-        assertThat(hostnameVector.getPositionCount(), equalTo(limit));
-
-        assertThat(docVector.shards().getInt(0), equalTo(0));
-        assertThat(voltageVector.getLong(0), equalTo(5L));
-        assertThat(hostnameVector.getBytesRef(0, new BytesRef()).utf8ToString(), equalTo("host-00"));
-        assertThat(tsidVector.getBytesRef(0, new BytesRef()).utf8ToString(), equalTo("\u0001\bhostnames\u0007host-00")); // legacy tsid
-        assertThat(timestampVector.getLong(0), equalTo(timestampStart + ((numSamplesPerTS - 1) * 10_000L)));
-    }
-
-    public void testRandom() {
-        record Doc(int host, long timestamp, long metric) {}
-        int numDocs = between(1, 5000);
-        List<Doc> docs = new ArrayList<>();
-        Map<Integer, Long> timestamps = new HashMap<>();
-        long t0 = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
-        for (int i = 0; i < numDocs; i++) {
-            int tsid = randomIntBetween(0, 9);
-            long timestamp = timestamps.compute(tsid, (k, curr) -> {
-                long t = curr != null ? curr : t0;
-                return t + randomIntBetween(1, 5000);
-            });
-            docs.add(new Doc(tsid, timestamp, randomIntBetween(1, 10000)));
-        }
-        int maxPageSize = between(1, 1024);
-        int limit = randomBoolean() ? between(1, 100000) : Integer.MAX_VALUE;
-        var timeSeriesFactory = createTimeSeriesSourceOperator(
-            directory,
-            r -> this.reader = r,
-            limit,
-            maxPageSize,
-            randomBoolean(),
-            writer -> {
-                Randomness.shuffle(docs);
-                for (Doc doc : docs) {
-                    writeTS(writer, doc.timestamp, new Object[] { "hostname", "h" + doc.host }, new Object[] { "metric", doc.metric });
-                }
-                return docs.size();
-            }
-        );
-        DriverContext driverContext = driverContext();
-        List<Page> results = new ArrayList<>();
-        var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG);
-        OperatorTestCase.runDriver(
-            new Driver(
-                "test",
-                driverContext,
-                timeSeriesFactory.get(driverContext),
-                List.of(ValuesSourceReaderOperatorTests.factory(reader, metricField, ElementType.LONG).get(driverContext)),
-                new TestResultPageSinkOperator(results::add),
-                () -> {}
-            )
-        );
-        docs.sort(Comparator.comparing(Doc::host).thenComparing(Comparator.comparingLong(Doc::timestamp).reversed()));
-        Map<Integer, Integer> hostToTsidOrd = new HashMap<>();
-        timestamps.keySet().stream().sorted().forEach(n -> hostToTsidOrd.put(n, hostToTsidOrd.size()));
-        int offset = 0;
-        for (int p = 0; p < results.size(); p++) {
-            Page page = results.get(p);
-            if (p < results.size() - 1) {
-                assertThat(page.getPositionCount(), equalTo(maxPageSize));
-            } else {
-                assertThat(page.getPositionCount(), lessThanOrEqualTo(limit));
-                assertThat(page.getPositionCount(), lessThanOrEqualTo(maxPageSize));
-            }
-            assertThat(page.getBlockCount(), equalTo(4));
-            DocVector docVector = (DocVector) page.getBlock(0).asVector();
-            BytesRefVector tsidVector = (BytesRefVector) page.getBlock(1).asVector();
-            LongVector timestampVector = (LongVector) page.getBlock(2).asVector();
-            LongVector metricVector = (LongVector) page.getBlock(3).asVector();
-            for (int i = 0; i < page.getPositionCount(); i++) {
-                Doc doc = docs.get(offset);
-                offset++;
-                assertThat(docVector.shards().getInt(0), equalTo(0));
-                assertThat(tsidVector.getBytesRef(i, new BytesRef()).utf8ToString(), equalTo("\u0001\bhostnames\u0002h" + doc.host));
-                assertThat(timestampVector.getLong(i), equalTo(doc.timestamp));
-                assertThat(metricVector.getLong(i), equalTo(doc.metric));
-            }
-        }
-        assertThat(offset, equalTo(Math.min(limit, numDocs)));
-    }
-
-    public void testMatchNone() throws Exception {
-        long t0 = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
-        Sort sort = new Sort(
-            new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING, false),
-            new SortedNumericSortField(DataStreamTimestampFieldMapper.DEFAULT_PATH, SortField.Type.LONG, true)
-        );
-        try (
-            var directory = newDirectory();
-            RandomIndexWriter writer = new RandomIndexWriter(
-                random(),
-                directory,
-                newIndexWriterConfig().setIndexSort(sort).setMergePolicy(NoMergePolicy.INSTANCE)
-            )
-        ) {
-            int numDocs = between(1, 100);
-            long timestamp = t0;
-            int metrics = randomIntBetween(1, 3);
-            for (int i = 0; i < numDocs; i++) {
-                timestamp += between(1, 1000);
-                for (int j = 0; j < metrics; j++) {
-                    String hostname = String.format(Locale.ROOT, "sensor-%02d", j);
-                    writeTS(writer, timestamp, new Object[] { "sensor", hostname }, new Object[] { "voltage", j + 5 });
-                }
-            }
-            try (var reader = writer.getReader()) {
-                var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
-                Query query = randomFrom(LongField.newRangeQuery("@timestamp", 0, t0), new MatchNoDocsQuery());
-                var timeSeriesFactory = TimeSeriesSortedSourceOperatorFactory.create(
-                    Integer.MAX_VALUE,
-                    randomIntBetween(1, 1024),
-                    1,
-                    List.of(ctx),
-                    unused -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of()))
-                );
-                var driverContext = driverContext();
-                List<Page> results = new ArrayList<>();
-                OperatorTestCase.runDriver(
-                    new Driver(
-                        "test",
-                        driverContext,
-                        timeSeriesFactory.get(driverContext),
-                        List.of(),
-                        new TestResultPageSinkOperator(results::add),
-                        () -> {}
-                    )
-                );
-                assertThat(results, empty());
-            }
-        }
-    }
-
-    @Override
-    protected Operator.OperatorFactory simple() {
-        return createTimeSeriesSourceOperator(directory, r -> this.reader = r, 1, 1, false, writer -> {
-            long timestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
-            writeTS(writer, timestamp, new Object[] { "hostname", "host-01" }, new Object[] { "voltage", 2 });
-            return 1;
-        });
-    }
-
-    @Override
-    protected Matcher<String> expectedDescriptionOfSimple() {
-        return equalTo("TimeSeriesSortedSourceOperator[maxPageSize = 1, limit = 1]");
-    }
-
-    @Override
-    protected Matcher<String> expectedToStringOfSimple() {
-        return equalTo("Impl[maxPageSize=1, remainingDocs=1]");
-    }
-
-    List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTimeSeries, int numSamplesPerTS, long timestampStart) {
-        var ctx = driverContext();
-        var timeSeriesFactory = createTimeSeriesSourceOperator(
-            directory,
-            indexReader -> this.reader = indexReader,
-            limit,
-            maxPageSize,
-            forceMerge,
-            writer -> {
-                long timestamp = timestampStart;
-                for (int i = 0; i < numSamplesPerTS; i++) {
-                    for (int j = 0; j < numTimeSeries; j++) {
-                        String hostname = String.format(Locale.ROOT, "host-%02d", j);
-                        writeTS(writer, timestamp, new Object[] { "hostname", hostname }, new Object[] { "voltage", j + 5 });
-                    }
-                    timestamp += 10_000;
-                    writer.commit();
-                }
-                return numTimeSeries * numSamplesPerTS;
-            }
-        );
-
-        List<Page> results = new ArrayList<>();
-        var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
-        var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname");
-        OperatorTestCase.runDriver(
-            new Driver(
-                "test",
-                ctx,
-                timeSeriesFactory.get(ctx),
-                List.of(
-                    ValuesSourceReaderOperatorTests.factory(reader, voltageField, ElementType.LONG).get(ctx),
-                    ValuesSourceReaderOperatorTests.factory(reader, hostnameField, ElementType.BYTES_REF).get(ctx)
-                ),
-                new TestResultPageSinkOperator(results::add),
-                () -> {}
-            )
-        );
-        OperatorTestCase.assertDriverContext(ctx);
-        for (Page result : results) {
-            assertThat(result.getPositionCount(), lessThanOrEqualTo(maxPageSize));
-            assertThat(result.getPositionCount(), lessThanOrEqualTo(limit));
-        }
-        return results;
-    }
-
-    public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperator(
-        Directory directory,
-        Consumer<IndexReader> readerConsumer,
-        int limit,
-        int maxPageSize,
-        boolean forceMerge,
-        CheckedFunction<RandomIndexWriter, Integer, IOException> indexingLogic
-    ) {
-        Sort sort = new Sort(
-            new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING, false),
-            new SortedNumericSortField(DataStreamTimestampFieldMapper.DEFAULT_PATH, SortField.Type.LONG, true)
-        );
-        IndexReader reader;
-        try (
-            RandomIndexWriter writer = new RandomIndexWriter(
-                random(),
-                directory,
-                newIndexWriterConfig().setIndexSort(sort).setMergePolicy(NoMergePolicy.INSTANCE)
-            )
-        ) {
-
-            int numDocs = indexingLogic.apply(writer);
-            if (forceMerge) {
-                writer.forceMerge(1);
-            }
-            reader = writer.getReader();
-            readerConsumer.accept(reader);
-            assertThat(reader.numDocs(), equalTo(numDocs));
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-        var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
-        Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction = c -> List.of(
-            new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())
-        );
-        return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, List.of(ctx), queryFunction);
-    }
-
-    public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {
-        final List<IndexableField> fields = new ArrayList<>();
-        fields.add(new SortedNumericDocValuesField(DataStreamTimestampFieldMapper.DEFAULT_PATH, timestamp));
-        fields.add(new LongPoint(DataStreamTimestampFieldMapper.DEFAULT_PATH, timestamp));
-        var routingPathFields = new RoutingPathFields(null);
-        for (int i = 0; i < dimensions.length; i += 2) {
-            if (dimensions[i + 1] instanceof Number n) {
-                routingPathFields.addLong(dimensions[i].toString(), n.longValue());
-            } else {
-                routingPathFields.addString(dimensions[i].toString(), dimensions[i + 1].toString());
-                fields.add(new SortedSetDocValuesField(dimensions[i].toString(), new BytesRef(dimensions[i + 1].toString())));
-            }
-        }
-        for (int i = 0; i < metrics.length; i += 2) {
-            if (metrics[i + 1] instanceof Integer || metrics[i + 1] instanceof Long) {
-                fields.add(new NumericDocValuesField(metrics[i].toString(), ((Number) metrics[i + 1]).longValue()));
-            } else if (metrics[i + 1] instanceof Float) {
-                fields.add(new FloatDocValuesField(metrics[i].toString(), (float) metrics[i + 1]));
-            } else if (metrics[i + 1] instanceof Double) {
-                fields.add(new DoubleDocValuesField(metrics[i].toString(), (double) metrics[i + 1]));
-            }
-        }
-        // Use legacy tsid to make tests easier to understand:
-        fields.add(
-            new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, TimeSeriesIdFieldMapper.buildLegacyTsid(routingPathFields).toBytesRef())
-        );
-        iw.addDocument(fields);
-    }
-}

+ 0 - 345
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java

@@ -1,345 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.compute.operator;
-
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.common.Randomness;
-import org.elasticsearch.common.Rounding;
-import org.elasticsearch.common.util.CollectionUtils;
-import org.elasticsearch.compute.aggregation.RateLongAggregatorFunctionSupplier;
-import org.elasticsearch.compute.aggregation.SumDoubleAggregatorFunctionSupplier;
-import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
-import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.BlockFactory;
-import org.elasticsearch.compute.data.BlockUtils;
-import org.elasticsearch.compute.data.ElementType;
-import org.elasticsearch.compute.data.LongBlock;
-import org.elasticsearch.compute.data.LongVector;
-import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests;
-import org.elasticsearch.compute.test.ComputeTestCase;
-import org.elasticsearch.compute.test.OperatorTestCase;
-import org.elasticsearch.compute.test.TestResultPageSinkOperator;
-import org.elasticsearch.core.IOUtils;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.index.mapper.KeywordFieldMapper;
-import org.elasticsearch.index.mapper.NumberFieldMapper;
-import org.junit.After;
-
-import java.io.IOException;
-import java.time.ZoneOffset;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.IntStream;
-
-import static org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorTests.createTimeSeriesSourceOperator;
-import static org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorTests.writeTS;
-import static org.elasticsearch.compute.operator.TimeSeriesAggregationOperatorFactories.SupplierWithChannels;
-import static org.hamcrest.Matchers.equalTo;
-
-public class TimeSeriesAggregationOperatorTests extends ComputeTestCase {
-
-    private IndexReader reader = null;
-    private Directory directory = null;
-
-    @After
-    public void cleanup() throws IOException {
-        IOUtils.close(reader, directory);
-    }
-
-    /**
-     * A {@link DriverContext} with a nonBreakingBigArrays.
-     */
-    protected DriverContext driverContext() { // TODO make this final once all operators support memory tracking
-        BlockFactory blockFactory = blockFactory();
-        return new DriverContext(blockFactory.bigArrays(), blockFactory);
-    }
-
-    public void testBasicRate() throws Exception {
-        long[] v1 = { 1, 1, 3, 0, 2, 9, 21, 3, 7, 7, 9, 12 };
-        long[] t1 = { 1, 5, 11, 20, 21, 59, 88, 91, 92, 97, 99, 112 };
-
-        long[] v2 = { 7, 2, 0, 11, 24, 0, 4, 1, 10, 2 };
-        long[] t2 = { 1, 2, 4, 5, 6, 8, 10, 11, 12, 14 };
-
-        long[] v3 = { 0, 1, 0, 1, 1, 4, 2, 2, 2, 2, 3, 5, 5 };
-        long[] t3 = { 2, 3, 5, 7, 8, 9, 10, 12, 14, 15, 18, 20, 22 };
-        List<Pod> pods = List.of(
-            new Pod("p1", "cluster_1", new Interval(2100, t1, v1)),
-            new Pod("p2", "cluster_1", new Interval(600, t2, v2)),
-            new Pod("p3", "cluster_2", new Interval(1100, t3, v3))
-        );
-        long unit = between(1, 5);
-        {
-            List<List<Object>> actual = runRateTest(
-                pods,
-                List.of("cluster"),
-                TimeValue.timeValueMillis(unit),
-                TimeValue.timeValueMillis(500)
-            );
-            List<List<Object>> expected = List.of(
-                List.of(new BytesRef("cluster_1"), 35.0 * unit / 111.0 + 42.0 * unit / 13.0),
-                List.of(new BytesRef("cluster_2"), 10.0 * unit / 20.0)
-            );
-            assertThat(actual, equalTo(expected));
-        }
-        {
-            List<List<Object>> actual = runRateTest(pods, List.of("pod"), TimeValue.timeValueMillis(unit), TimeValue.timeValueMillis(500));
-            List<List<Object>> expected = List.of(
-                List.of(new BytesRef("p1"), 35.0 * unit / 111.0),
-                List.of(new BytesRef("p2"), 42.0 * unit / 13.0),
-                List.of(new BytesRef("p3"), 10.0 * unit / 20.0)
-            );
-            assertThat(actual, equalTo(expected));
-        }
-        {
-            List<List<Object>> actual = runRateTest(
-                pods,
-                List.of("cluster", "bucket"),
-                TimeValue.timeValueMillis(unit),
-                TimeValue.timeValueMillis(500)
-            );
-            List<List<Object>> expected = List.of(
-                List.of(new BytesRef("cluster_1"), 2000L, 35.0 * unit / 111.0),
-                List.of(new BytesRef("cluster_1"), 500L, 42.0 * unit / 13.0),
-                List.of(new BytesRef("cluster_2"), 1000L, 10.0 * unit / 20.0)
-            );
-            assertThat(actual, equalTo(expected));
-        }
-    }
-
-    public void testRateWithInterval() throws Exception {
-        long[] v1 = { 1, 2, 3, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3 };
-        long[] t1 = { 0, 10_000, 20_000, 30_000, 40_000, 50_000, 60_000, 70_000, 80_000, 90_000, 100_000, 110_000, 120_000 };
-
-        long[] v2 = { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 };
-        long[] t2 = { 0, 10_000, 20_000, 30_000, 40_000, 50_000, 60_000, 70_000, 80_000, 90_000, 100_000, 110_000, 120_000 };
-
-        long[] v3 = { 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192 };
-        long[] t3 = { 0, 10_000, 20_000, 30_000, 40_000, 50_000, 60_000, 70_000, 80_000, 90_000, 100_000, 110_000, 120_000 };
-        List<Pod> pods = List.of(
-            new Pod("p1", "cluster_1", new Interval(0, t1, v1)),
-            new Pod("p2", "cluster_2", new Interval(0, t2, v2)),
-            new Pod("p3", "cluster_2", new Interval(0, t3, v3))
-        );
-        List<List<Object>> actual = runRateTest(
-            pods,
-            List.of("pod", "bucket"),
-            TimeValue.timeValueMillis(1),
-            TimeValue.timeValueMinutes(1)
-        );
-        List<List<Object>> expected = List.of(
-            List.of(new BytesRef("p1]"), 120_000L, 0.0D),
-            List.of(new BytesRef("p1"), 60_000L, 8.0E-5D),
-            List.of(new BytesRef("p1"), 0, 8.0E-5D),
-            List.of(new BytesRef("p2"), 120_000L, 0.0D),
-            List.of(new BytesRef("p2"), 60_000L, 0.0D),
-            List.of(new BytesRef("p2"), 0L, 0.0D),
-            List.of(new BytesRef("p3"), 120_000L, 0.0D),
-            List.of(new BytesRef("p3"), 60_000L, 0.07936D),
-            List.of(new BytesRef("p3"), 0L, 0.00124D)
-        );
-    }
-
-    public void testRandomRate() throws Exception {
-        int numPods = between(1, 10);
-        List<Pod> pods = new ArrayList<>();
-        TimeValue unit = TimeValue.timeValueSeconds(1);
-        List<List<Object>> expected = new ArrayList<>();
-        for (int p = 0; p < numPods; p++) {
-            int numIntervals = randomIntBetween(1, 3);
-            Interval[] intervals = new Interval[numIntervals];
-            long startTimeInHours = between(10, 100);
-            String podName = "p" + p;
-            for (int interval = 0; interval < numIntervals; interval++) {
-                final long startInterval = TimeValue.timeValueHours(--startTimeInHours).millis();
-                int numValues = between(2, 100);
-                long[] values = new long[numValues];
-                long[] times = new long[numValues];
-                long delta = 0;
-                for (int i = 0; i < numValues; i++) {
-                    values[i] = randomIntBetween(0, 100);
-                    delta += TimeValue.timeValueSeconds(between(1, 10)).millis();
-                    times[i] = delta;
-                }
-                intervals[interval] = new Interval(startInterval, times, values);
-                if (numValues == 1) {
-                    expected.add(List.of(new BytesRef(podName), startInterval, null));
-                } else {
-                    expected.add(List.of(new BytesRef(podName), startInterval, intervals[interval].expectedRate(unit)));
-                }
-            }
-            Pod pod = new Pod(podName, "cluster", intervals);
-            pods.add(pod);
-        }
-        List<List<Object>> actual = runRateTest(pods, List.of("pod", "bucket"), unit, TimeValue.timeValueHours(1));
-        assertThat(actual, equalTo(expected));
-    }
-
-    record Interval(long offset, long[] times, long[] values) {
-        double expectedRate(TimeValue unit) {
-            double dv = 0;
-            for (int v = 0; v < values.length - 1; v++) {
-                if (values[v + 1] < values[v]) {
-                    dv += values[v];
-                }
-            }
-            dv += (values[values.length - 1] - values[0]);
-            long dt = times[times.length - 1] - times[0];
-            return (dv * unit.millis()) / dt;
-        }
-    }
-
-    record Pod(String name, String cluster, Interval... intervals) {}
-
-    List<List<Object>> runRateTest(List<Pod> pods, List<String> groupings, TimeValue unit, TimeValue bucketInterval) throws IOException {
-        cleanup();
-        directory = newDirectory();
-        long unitInMillis = unit.millis();
-        record Doc(String pod, String cluster, long timestamp, long requests) {
-
-        }
-        var sourceOperatorFactory = createTimeSeriesSourceOperator(
-            directory,
-            r -> this.reader = r,
-            Integer.MAX_VALUE,
-            between(1, 100),
-            randomBoolean(),
-            writer -> {
-                List<Doc> docs = new ArrayList<>();
-                for (Pod pod : pods) {
-                    for (Interval interval : pod.intervals) {
-                        for (int i = 0; i < interval.times.length; i++) {
-                            docs.add(new Doc(pod.name, pod.cluster, interval.offset + interval.times[i], interval.values[i]));
-                        }
-                    }
-                }
-                Randomness.shuffle(docs);
-                for (Doc doc : docs) {
-                    writeTS(
-                        writer,
-                        doc.timestamp,
-                        new Object[] { "pod", doc.pod, "cluster", doc.cluster },
-                        new Object[] { "requests", doc.requests }
-                    );
-                }
-                return docs.size();
-            }
-        );
-        var ctx = driverContext();
-
-        List<Operator> intermediateOperators = new ArrayList<>();
-        final Rounding.Prepared rounding = new Rounding.Builder(bucketInterval).timeZone(ZoneOffset.UTC).build().prepareForUnknown();
-        var timeBucket = new EvalOperator(ctx.blockFactory(), new EvalOperator.ExpressionEvaluator() {
-            @Override
-            public Block eval(Page page) {
-                LongBlock timestampsBlock = page.getBlock(2);
-                LongVector timestamps = timestampsBlock.asVector();
-                try (var builder = blockFactory().newLongVectorFixedBuilder(timestamps.getPositionCount())) {
-                    for (int i = 0; i < timestamps.getPositionCount(); i++) {
-                        builder.appendLong(rounding.round(timestampsBlock.getLong(i)));
-                    }
-                    return builder.build().asBlock();
-                }
-            }
-
-            @Override
-            public void close() {
-
-            }
-        });
-        intermediateOperators.add(timeBucket);
-        var rateField = new NumberFieldMapper.NumberFieldType("requests", NumberFieldMapper.NumberType.LONG);
-        Operator extractRate = (ValuesSourceReaderOperatorTests.factory(reader, rateField, ElementType.LONG).get(ctx));
-        intermediateOperators.add(extractRate);
-        List<String> nonBucketGroupings = new ArrayList<>(groupings);
-        nonBucketGroupings.remove("bucket");
-        for (String grouping : nonBucketGroupings) {
-            var groupingField = new KeywordFieldMapper.KeywordFieldType(grouping);
-            intermediateOperators.add(ValuesSourceReaderOperatorTests.factory(reader, groupingField, ElementType.BYTES_REF).get(ctx));
-        }
-        // _doc, tsid, timestamp, bucket, requests, grouping1, grouping2
-        Operator intialAgg = new TimeSeriesAggregationOperatorFactories.Initial(
-            1,
-            3,
-            IntStream.range(0, nonBucketGroupings.size()).mapToObj(n -> new BlockHash.GroupSpec(5 + n, ElementType.BYTES_REF)).toList(),
-            List.of(new SupplierWithChannels(new RateLongAggregatorFunctionSupplier(unitInMillis), List.of(4, 2))),
-            List.of(),
-            between(1, 100)
-        ).get(ctx);
-
-        // tsid, bucket, rate[0][0],rate[0][1],rate[0][2], grouping1, grouping2
-        Operator intermediateAgg = new TimeSeriesAggregationOperatorFactories.Intermediate(
-            0,
-            1,
-            IntStream.range(0, nonBucketGroupings.size()).mapToObj(n -> new BlockHash.GroupSpec(5 + n, ElementType.BYTES_REF)).toList(),
-            List.of(new SupplierWithChannels(new RateLongAggregatorFunctionSupplier(unitInMillis), List.of(2, 3, 4))),
-            List.of(),
-            between(1, 100)
-        ).get(ctx);
-        // tsid, bucket, rate, grouping1, grouping2
-        List<BlockHash.GroupSpec> finalGroups = new ArrayList<>();
-        int groupChannel = 3;
-        for (String grouping : groupings) {
-            if (grouping.equals("bucket")) {
-                finalGroups.add(new BlockHash.GroupSpec(1, ElementType.LONG));
-            } else {
-                finalGroups.add(new BlockHash.GroupSpec(groupChannel++, ElementType.BYTES_REF));
-            }
-        }
-        Operator finalAgg = new TimeSeriesAggregationOperatorFactories.Final(
-            finalGroups,
-            List.of(new SupplierWithChannels(new SumDoubleAggregatorFunctionSupplier(), List.of(2))),
-            List.of(),
-            between(1, 100)
-        ).get(ctx);
-
-        List<Page> results = new ArrayList<>();
-        OperatorTestCase.runDriver(
-            new Driver(
-                "test",
-                ctx,
-                sourceOperatorFactory.get(ctx),
-                CollectionUtils.concatLists(intermediateOperators, List.of(intialAgg, intermediateAgg, finalAgg)),
-                new TestResultPageSinkOperator(results::add),
-                () -> {}
-            )
-        );
-        List<List<Object>> values = new ArrayList<>();
-        for (Page result : results) {
-            for (int p = 0; p < result.getPositionCount(); p++) {
-                int blockCount = result.getBlockCount();
-                List<Object> row = new ArrayList<>();
-                for (int b = 0; b < blockCount; b++) {
-                    row.add(BlockUtils.toJavaObject(result.getBlock(b), p));
-                }
-                values.add(row);
-            }
-            result.releaseBlocks();
-        }
-        values.sort((v1, v2) -> {
-            for (int i = 0; i < v1.size(); i++) {
-                if (v1.get(i) instanceof BytesRef b1) {
-                    int cmp = b1.compareTo((BytesRef) v2.get(i));
-                    if (cmp != 0) {
-                        return cmp;
-                    }
-                } else if (v1.get(i) instanceof Long b1) {
-                    int cmp = b1.compareTo((Long) v2.get(i));
-                    if (cmp != 0) {
-                        return -cmp;
-                    }
-                }
-            }
-            return 0;
-        });
-        return values;
-    }
-}

+ 0 - 102
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/TSDBRestEsqlIT.java

@@ -1,102 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-package org.elasticsearch.xpack.esql.qa.single_node;
-
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-
-import org.apache.http.util.EntityUtils;
-import org.elasticsearch.Build;
-import org.elasticsearch.client.Request;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.test.TestClustersThreadFilter;
-import org.elasticsearch.test.cluster.ElasticsearchCluster;
-import org.elasticsearch.test.rest.ESRestTestCase;
-import org.elasticsearch.xpack.esql.CsvTestsDataLoader;
-import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase;
-import org.junit.ClassRule;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-
-import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.runEsqlSync;
-
-/**
- * A dedicated test suite for testing time series esql functionality.
- * This while the functionality is gated behind a query pragma.
- */
-@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
-public class TSDBRestEsqlIT extends ESRestTestCase {
-    @ClassRule
-    public static ElasticsearchCluster cluster = Clusters.testCluster();
-
-    @Override
-    protected String getTestRestCluster() {
-        return cluster.getHttpAddresses();
-    }
-
-    public void testTimeSeriesQuerying() throws IOException {
-        assumeTrue("time series querying relies on query pragma", Build.current().isSnapshot());
-        var settings = Settings.builder()
-            .loadFromStream("tsdb-settings.json", TSDBRestEsqlIT.class.getResourceAsStream("/tsdb-settings.json"), false)
-            .build();
-        String mapping = CsvTestsDataLoader.readTextFile(TSDBRestEsqlIT.class.getResource("/tsdb-k8s-mapping.json"));
-        createIndex("k8s", settings, mapping);
-
-        Request bulk = new Request("POST", "/k8s/_bulk");
-        bulk.addParameter("refresh", "true");
-        bulk.addParameter("filter_path", "errors");
-
-        String bulkBody = new String(
-            TSDBRestEsqlIT.class.getResourceAsStream("/tsdb-bulk-request.txt").readAllBytes(),
-            StandardCharsets.UTF_8
-        );
-        bulk.setJsonEntity(bulkBody);
-        Response response = client().performRequest(bulk);
-        assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8));
-
-        RestEsqlTestCase.RequestObjectBuilder builder = RestEsqlTestCase.requestObjectBuilder()
-            .query("FROM k8s | KEEP k8s.pod.name, @timestamp | SORT @timestamp, k8s.pod.name");
-        builder.pragmas(Settings.builder().put("time_series", true).build());
-        Map<String, Object> result = runEsqlSync(builder);
-        @SuppressWarnings("unchecked")
-        List<Map<?, ?>> columns = (List<Map<?, ?>>) result.get("columns");
-        assertEquals(2, columns.size());
-        assertEquals("k8s.pod.name", columns.get(0).get("name"));
-        assertEquals("@timestamp", columns.get(1).get("name"));
-
-        // Note that _tsid is a hashed value, so tsid no longer is sorted lexicographically.
-        @SuppressWarnings("unchecked")
-        List<List<?>> values = (List<List<?>>) result.get("values");
-        assertEquals(8, values.size());
-        assertEquals("2021-04-29T17:29:12.470Z", values.get(0).get(1));
-        assertEquals("cat", values.get(0).get(0));
-
-        assertEquals("2021-04-29T17:29:12.470Z", values.get(0).get(1));
-        assertEquals("cow", values.get(1).get(0));
-
-        assertEquals("2021-04-29T17:29:12.470Z", values.get(0).get(1));
-        assertEquals("hamster", values.get(2).get(0));
-
-        assertEquals("2021-04-29T17:29:12.470Z", values.get(0).get(1));
-        assertEquals("rat", values.get(3).get(0));
-
-        assertEquals("2021-04-29T17:29:22.470Z", values.get(4).get(1));
-        assertEquals("cat", values.get(4).get(0));
-
-        assertEquals("2021-04-29T17:29:22.470Z", values.get(4).get(1));
-        assertEquals("cow", values.get(5).get(0));
-
-        assertEquals("2021-04-29T17:29:22.470Z", values.get(4).get(1));
-        assertEquals("hamster", values.get(6).get(0));
-
-        assertEquals("2021-04-29T17:29:22.470Z", values.get(4).get(1));
-        assertEquals("rat", values.get(7).get(0));
-    }
-}

+ 0 - 17
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/resources/tsdb-bulk-request.txt

@@ -1,17 +0,0 @@
-{"create": {}}
-{"@timestamp": "2021-04-29T17:29:12.470Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "network": {"tx": 2001818691, "rx": 802133794},"cpu": {"limit": 0.3787411612903226, "nanocores": 35222928, "node": 0.048845732}}}}
-{"create": {}}
-{"@timestamp": "2021-04-29T17:29:12.470Z", "metricset": "pod", "k8s": {"pod": {"name": "hamster", "uid":"947e4ced-1786-4e53-9e0c-5c447e959508", "network": {"tx": 2005177954, "rx": 801479970},"cpu": {"limit": 0.5786461612903226, "nanocores": 25222928, "node": 0.505805732}}}}
-{"create": {}}
-{"@timestamp": "2021-04-29T17:29:12.470Z", "metricset": "pod", "k8s": {"pod": {"name": "cow", "uid":"947e4ced-1786-4e53-9e0c-5c447e959509", "network": {"tx": 2006223737, "rx": 802337279},"cpu": {"limit": 0.5787451612903226, "nanocores": 55252928, "node": 0.606805732}}}}
-{"create": {}}
-{"@timestamp": "2021-04-29T17:29:12.470Z", "metricset": "pod", "k8s": {"pod": {"name": "rat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959510", "network": {"tx": 2012916202, "rx": 803685721},"cpu": {"limit": 0.6786461612903226, "nanocores": 75227928, "node": 0.058855732}}}}
-{"create": {}}
-{"@timestamp": "2021-04-29T17:29:22.470Z", "metricset": "pod", "k8s": {"pod": {"name": "rat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959510", "network": {"tx": 1434521831, "rx": 530575198},"cpu": {"limit": 0.7787411712903226, "nanocores": 75727928, "node": 0.068865732}}}}
-{"create": {}}
-{"@timestamp": "2021-04-29T17:29:22.470Z", "metricset": "pod", "k8s": {"pod": {"name": "cow", "uid":"947e4ced-1786-4e53-9e0c-5c447e959509", "network": {"tx": 1434577921, "rx": 530600088},"cpu": {"limit": 0.2782412612903226, "nanocores": 25222228, "node": 0.078875732}}}}
-{"create": {}}
-{"@timestamp": "2021-04-29T17:29:22.470Z", "metricset": "pod", "k8s": {"pod": {"name": "hamster", "uid":"947e4ced-1786-4e53-9e0c-5c447e959508", "network": {"tx": 1434587694, "rx": 530604797},"cpu": {"limit": 0.1717411612903226, "nanocores": 15121928, "node": 0.808805732}}}}
-{"create": {}}
-{"@timestamp": "2021-04-29T17:29:22.470Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "network": {"tx": 1434595272, "rx": 530605511},"cpu": {"limit": 0.8787481682903226, "nanocores": 95292928, "node": 0.908905732}}}}
-

+ 0 - 56
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/resources/tsdb-k8s-mapping.json

@@ -1,56 +0,0 @@
-{
-  "properties": {
-    "@timestamp": {
-      "type": "date"
-    },
-    "metricset": {
-      "type": "keyword",
-      "time_series_dimension": true
-    },
-    "k8s": {
-      "properties": {
-        "pod": {
-          "properties": {
-            "uid": {
-              "type": "keyword",
-              "time_series_dimension": true
-            },
-            "name": {
-              "type": "keyword"
-            },
-            "cpu": {
-              "properties": {
-                "limit": {
-                  "type": "scaled_float",
-                  "scaling_factor": 1000.0,
-                  "time_series_metric": "gauge"
-                },
-                "nanocores": {
-                  "type": "long",
-                  "time_series_metric": "gauge"
-                },
-                "node": {
-                  "type": "scaled_float",
-                  "scaling_factor": 1000.0,
-                  "time_series_metric": "gauge"
-                }
-              }
-            },
-            "network": {
-              "properties": {
-                "rx": {
-                  "type": "long",
-                  "time_series_metric": "gauge"
-                },
-                "tx": {
-                  "type": "long",
-                  "time_series_metric": "gauge"
-                }
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-}

+ 0 - 129
x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-metrics.csv-spec

@@ -1,129 +0,0 @@
-metricsWithoutAggs
-required_capability: metrics_syntax
-METRICS k8s | sort @timestamp DESC, cluster, pod | keep @timestamp,cluster,pod,network.bytes_in,network.cost | limit 5;
-
-@timestamp:datetime      | cluster:keyword | pod: keyword| network.bytes_in:long | network.cost:double
-2024-05-10T00:22:59.000Z | qa              | one         | 206                   | 6.25
-2024-05-10T00:22:54.000Z | qa              | three       | 972                   | 10.875
-2024-05-10T00:22:53.000Z | prod            | two         | 812                   | 10.75
-2024-05-10T00:22:53.000Z | staging         | one         | 238                   | 4.625
-2024-05-10T00:22:49.000Z | staging         | two         | 3                     | 1.75
-;
-
-metricsWithAggsAndSourceQuoting
-required_capability: metrics_syntax
-required_capability: double_quotes_source_enclosing
-METRICS "k8s" max_bytes=max(to_long(network.total_bytes_in)) BY cluster | SORT max_bytes DESC;
-
-max_bytes:long | cluster: keyword
-10797          | qa        
-10277          | prod
-7403           | staging
-;
-
-maxRateAndSourceTripleQuoting
-required_capability: metrics_syntax
-required_capability: double_quotes_source_enclosing
-METRICS """k8s""" max(rate(network.total_bytes_in, 1minute));
-
-max(rate(network.total_bytes_in, 1minute)): double
-790.4235090751945
-;
-
-maxCost
-required_capability: metrics_syntax
-METRICS k8s max_cost=max(rate(network.total_cost));
-
-max_cost: double
-0.16151685393258428
-;
-
-maxRateAndBytes
-required_capability: metrics_syntax
-METRICS k8s max(rate(network.total_bytes_in, 1minute)), max(network.bytes_in);
-
-max(rate(network.total_bytes_in, 1minute)): double | max(network.bytes_in): long
-790.4235090751945                                  | 1021
-;
-
-`maxRateAndMarkupBytes`
-required_capability: metrics_syntax
-METRICS k8s max(rate(network.total_bytes_in, 1minute)), max(network.bytes_in * 1.05);
-
-max(rate(network.total_bytes_in, 1minute)): double | max(network.bytes_in * 1.05): double
-790.4235090751945                                  | 1072.05
-;
-
-maxRateAndBytesAndCost
-required_capability: metrics_syntax
-METRICS k8s max(rate(network.total_bytes_in, 1minute)), max(network.bytes_in), max(rate(network.total_cost));
-
-max(rate(network.total_bytes_in, 1minute)): double| max(network.bytes_in): long| max(rate(network.total_cost)): double
-790.4235090751945                                 | 1021                       | 0.16151685393258428
-;
-
-sumRate
-required_capability: metrics_syntax
-METRICS k8s bytes=sum(rate(network.total_bytes_in)), sum(rate(network.total_cost)) BY cluster | SORT cluster;
-
-bytes: double       | sum(rate(network.total_cost)): double | cluster: keyword
-24.49149357711476   | 0.3018995503437827                    | prod
-33.308519044441084  | 0.4474920369252062                    | qa
-18.610708062016123  | 0.24387090901805775                   | staging
-;
-
-oneRateWithBucket
-required_capability: metrics_syntax
-METRICS k8s max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute) | SORT time_bucket DESC | LIMIT 2;
-
-max(rate(network.total_bytes_in)): double | time_bucket:date
-10.594594594594595                        | 2024-05-10T00:20:00.000Z
-23.702205882352942                        | 2024-05-10T00:15:00.000Z
-;
-
-twoRatesWithBucket
-required_capability: metrics_syntax
-METRICS k8s max(rate(network.total_bytes_in)), sum(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute) | SORT time_bucket DESC | LIMIT 3;
-
-max(rate(network.total_bytes_in)): double | sum(rate(network.total_bytes_in)): double | time_bucket:date
-10.594594594594595                        | 42.70864495221802                         | 2024-05-10T00:20:00.000Z
-23.702205882352942                        | 112.36715680313907                        | 2024-05-10T00:15:00.000Z
-17.90625                                  | 85.18387414067914                         | 2024-05-10T00:10:00.000Z
-;
-
-
-oneRateWithBucketAndCluster
-required_capability: metrics_syntax
-METRICS k8s max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC, cluster | LIMIT 6;
-
-max(rate(network.total_bytes_in)): double | time_bucket:date          | cluster: keyword
-10.594594594594595                        | 2024-05-10T00:20:00.000Z  | prod
-5.586206896551724                         | 2024-05-10T00:20:00.000Z  | qa
-5.37037037037037                          | 2024-05-10T00:20:00.000Z  | staging
-15.913978494623656                        | 2024-05-10T00:15:00.000Z  | prod
-23.702205882352942                        | 2024-05-10T00:15:00.000Z  | qa
-9.823232323232324                         | 2024-05-10T00:15:00.000Z  | staging
-;
-
-BytesAndCostByBucketAndCluster
-required_capability: metrics_syntax
-METRICS k8s max(rate(network.total_bytes_in)), max(network.cost) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC, cluster | LIMIT 6;
-
-max(rate(network.total_bytes_in)): double | max(network.cost): double | time_bucket:date         | cluster: keyword
-10.594594594594595                        | 10.75                     | 2024-05-10T00:20:00.000Z | prod
-5.586206896551724                         | 11.875                    | 2024-05-10T00:20:00.000Z | qa
-5.37037037037037                          | 9.5                       | 2024-05-10T00:20:00.000Z | staging
-15.913978494623656                        | 12.375                    | 2024-05-10T00:15:00.000Z | prod
-23.702205882352942                        | 12.125                    | 2024-05-10T00:15:00.000Z | qa
-9.823232323232324                         | 11.5                      | 2024-05-10T00:15:00.000Z | staging
-;
-
-oneRateWithBucketAndClusterThenFilter
-required_capability: metrics_syntax
-METRICS k8s max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | WHERE cluster=="prod" | SORT time_bucket DESC | LIMIT 3;
-
-max(rate(network.total_bytes_in)): double | time_bucket:date          | cluster: keyword
-10.594594594594595                        | 2024-05-10T00:20:00.000Z  | prod
-15.913978494623656                        | 2024-05-10T00:15:00.000Z  | prod
-11.562737642585551                        | 2024-05-10T00:10:00.000Z  | prod
-;

+ 0 - 798
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java

@@ -1,798 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.esql.action;
-
-import org.elasticsearch.Build;
-import org.elasticsearch.common.Randomness;
-import org.elasticsearch.common.Rounding;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.xpack.esql.EsqlTestUtils;
-import org.elasticsearch.xpack.esql.core.type.DataType;
-import org.junit.Before;
-
-import java.time.ZoneOffset;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
-import static org.hamcrest.Matchers.closeTo;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-
-public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
-
-    @Override
-    protected EsqlQueryResponse run(EsqlQueryRequest request) {
-        assumeTrue("time series available in snapshot builds only", Build.current().isSnapshot());
-        return super.run(request);
-    }
-
-    public void testEmpty() {
-        Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host")).build();
-        client().admin()
-            .indices()
-            .prepareCreate("empty_index")
-            .setSettings(settings)
-            .setMapping(
-                "@timestamp",
-                "type=date",
-                "host",
-                "type=keyword,time_series_dimension=true",
-                "cpu",
-                "type=long,time_series_metric=gauge"
-            )
-            .get();
-        run("METRICS empty_index | LIMIT 1").close();
-    }
-
-    record Doc(String host, String cluster, long timestamp, int requestCount, double cpu) {}
-
-    final List<Doc> docs = new ArrayList<>();
-
-    record RequestCounter(long timestamp, long count) {
-
-    }
-
-    static Double computeRate(List<RequestCounter> values) {
-        List<RequestCounter> sorted = values.stream().sorted(Comparator.comparingLong(RequestCounter::timestamp)).toList();
-        if (sorted.size() < 2) {
-            return null;
-        }
-        long resets = 0;
-        for (int i = 0; i < sorted.size() - 1; i++) {
-            if (sorted.get(i).count > sorted.get(i + 1).count) {
-                resets += sorted.get(i).count;
-            }
-        }
-        RequestCounter last = sorted.get(sorted.size() - 1);
-        RequestCounter first = sorted.get(0);
-        double dv = resets + last.count - first.count;
-        double dt = last.timestamp - first.timestamp;
-        return dv * 1000 / dt;
-    }
-
-    @Before
-    public void populateIndex() {
-        // this can be expensive, do one
-        Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build();
-        client().admin()
-            .indices()
-            .prepareCreate("hosts")
-            .setSettings(settings)
-            .setMapping(
-                "@timestamp",
-                "type=date",
-                "host",
-                "type=keyword,time_series_dimension=true",
-                "cluster",
-                "type=keyword,time_series_dimension=true",
-                "cpu",
-                "type=double,time_series_metric=gauge",
-                "request_count",
-                "type=integer,time_series_metric=counter"
-            )
-            .get();
-        Map<String, String> hostToClusters = new HashMap<>();
-        for (int i = 0; i < 5; i++) {
-            hostToClusters.put("p" + i, randomFrom("qa", "prod"));
-        }
-        long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z");
-        int numDocs = between(20, 100);
-        docs.clear();
-        Map<String, Integer> requestCounts = new HashMap<>();
-        for (int i = 0; i < numDocs; i++) {
-            List<String> hosts = randomSubsetOf(between(1, hostToClusters.size()), hostToClusters.keySet());
-            timestamp += between(1, 10) * 1000L;
-            for (String host : hosts) {
-                var requestCount = requestCounts.compute(host, (k, curr) -> {
-                    if (curr == null || randomInt(100) <= 20) {
-                        return randomIntBetween(0, 10);
-                    } else {
-                        return curr + randomIntBetween(1, 10);
-                    }
-                });
-                int cpu = randomIntBetween(0, 100);
-                docs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, cpu));
-            }
-        }
-        Randomness.shuffle(docs);
-        for (Doc doc : docs) {
-            client().prepareIndex("hosts")
-                .setSource(
-                    "@timestamp",
-                    doc.timestamp,
-                    "host",
-                    doc.host,
-                    "cluster",
-                    doc.cluster,
-                    "cpu",
-                    doc.cpu,
-                    "request_count",
-                    doc.requestCount
-                )
-                .get();
-        }
-        client().admin().indices().prepareRefresh("hosts").get();
-    }
-
-    public void testSimpleMetrics() {
-        List<String> sortedGroups = docs.stream().map(d -> d.host).distinct().sorted().toList();
-        client().admin().indices().prepareRefresh("hosts").get();
-        try (EsqlQueryResponse resp = run("METRICS hosts load=avg(cpu) BY host | SORT host")) {
-            List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
-            assertThat(rows, hasSize(sortedGroups.size()));
-            for (int i = 0; i < rows.size(); i++) {
-                List<Object> r = rows.get(i);
-                String pod = (String) r.get(1);
-                assertThat(pod, equalTo(sortedGroups.get(i)));
-                List<Double> values = docs.stream().filter(d -> d.host.equals(pod)).map(d -> d.cpu).toList();
-                double avg = values.stream().mapToDouble(n -> n).sum() / values.size();
-                assertThat((double) r.get(0), equalTo(avg));
-            }
-        }
-        try (EsqlQueryResponse resp = run("METRICS hosts | SORT @timestamp DESC, host | KEEP @timestamp, host, cpu | LIMIT 5")) {
-            List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
-            List<Doc> topDocs = docs.stream()
-                .sorted(Comparator.comparingLong(Doc::timestamp).reversed().thenComparing(Doc::host))
-                .limit(5)
-                .toList();
-            assertThat(rows, hasSize(topDocs.size()));
-            for (int i = 0; i < rows.size(); i++) {
-                List<Object> r = rows.get(i);
-                long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis((String) r.get(0));
-                String pod = (String) r.get(1);
-                double cpu = (Double) r.get(2);
-                assertThat(topDocs.get(i).timestamp, equalTo(timestamp));
-                assertThat(topDocs.get(i).host, equalTo(pod));
-                assertThat(topDocs.get(i).cpu, equalTo(cpu));
-            }
-        }
-    }
-
-    public void testRateWithoutGrouping() {
-        record RateKey(String cluster, String host) {
-
-        }
-        Map<RateKey, List<RequestCounter>> groups = new HashMap<>();
-        for (Doc doc : docs) {
-            RateKey key = new RateKey(doc.cluster, doc.host);
-            groups.computeIfAbsent(key, k -> new ArrayList<>()).add(new RequestCounter(doc.timestamp, doc.requestCount));
-        }
-        List<Double> rates = new ArrayList<>();
-        for (List<RequestCounter> group : groups.values()) {
-            Double v = computeRate(group);
-            if (v != null) {
-                rates.add(v);
-            }
-        }
-        try (var resp = run("METRICS hosts sum(rate(request_count, 1second))")) {
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count, 1second))", "double", null))));
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(1));
-            assertThat(values.get(0), hasSize(1));
-            assertThat((double) values.get(0).get(0), closeTo(rates.stream().mapToDouble(d -> d).sum(), 0.1));
-        }
-        try (var resp = run("METRICS hosts max(rate(request_count)), min(rate(request_count))")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("max(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("min(rate(request_count))", "double", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(1));
-            assertThat(values.get(0), hasSize(2));
-            assertThat((double) values.get(0).get(0), closeTo(rates.stream().mapToDouble(d -> d).max().orElse(0.0), 0.1));
-            assertThat((double) values.get(0).get(1), closeTo(rates.stream().mapToDouble(d -> d).min().orElse(0.0), 0.1));
-        }
-        try (var resp = run("METRICS hosts max(rate(request_count)), avg(rate(request_count)), max(rate(request_count, 1minute))")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("max(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("avg(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("max(rate(request_count, 1minute))", "double", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(1));
-            assertThat(values.get(0), hasSize(3));
-            assertThat((double) values.get(0).get(0), closeTo(rates.stream().mapToDouble(d -> d).max().orElse(0.0), 0.1));
-            final double avg = rates.isEmpty() ? 0.0 : rates.stream().mapToDouble(d -> d).sum() / rates.size();
-            assertThat((double) values.get(0).get(1), closeTo(avg, 0.1));
-            assertThat((double) values.get(0).get(2), closeTo(rates.stream().mapToDouble(d -> d * 60.0).max().orElse(0.0), 0.1));
-        }
-        try (var resp = run("METRICS hosts avg(rate(request_count)), avg(rate(request_count, 1second))")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("avg(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("avg(rate(request_count, 1second))", "double", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(1));
-            assertThat(values.get(0), hasSize(2));
-            final double avg = rates.isEmpty() ? 0.0 : rates.stream().mapToDouble(d -> d).sum() / rates.size();
-            assertThat((double) values.get(0).get(0), closeTo(avg, 0.1));
-            assertThat((double) values.get(0).get(1), closeTo(avg, 0.1));
-        }
-        try (var resp = run("METRICS hosts max(rate(request_count)), min(rate(request_count)), min(cpu), max(cpu)")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("max(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("min(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("min(cpu)", "double", null),
-                        new ColumnInfoImpl("max(cpu)", "double", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(1));
-            assertThat(values.get(0), hasSize(4));
-            assertThat((double) values.get(0).get(0), closeTo(rates.stream().mapToDouble(d -> d).max().orElse(0.0), 0.1));
-            assertThat((double) values.get(0).get(1), closeTo(rates.stream().mapToDouble(d -> d).min().orElse(0.0), 0.1));
-            double minCpu = docs.stream().mapToDouble(d -> d.cpu).min().orElse(Long.MAX_VALUE);
-            double maxCpu = docs.stream().mapToDouble(d -> d.cpu).max().orElse(Long.MIN_VALUE);
-            assertThat((double) values.get(0).get(2), closeTo(minCpu, 0.1));
-            assertThat((double) values.get(0).get(3), closeTo(maxCpu, 0.1));
-        }
-    }
-
-    public void testRateGroupedByCluster() {
-        record RateKey(String cluster, String host) {
-
-        }
-        Map<RateKey, List<RequestCounter>> groups = new HashMap<>();
-        for (Doc doc : docs) {
-            RateKey key = new RateKey(doc.cluster, doc.host);
-            groups.computeIfAbsent(key, k -> new ArrayList<>()).add(new RequestCounter(doc.timestamp, doc.requestCount));
-        }
-        Map<String, List<Double>> bucketToRates = new HashMap<>();
-        for (Map.Entry<RateKey, List<RequestCounter>> e : groups.entrySet()) {
-            List<Double> values = bucketToRates.computeIfAbsent(e.getKey().cluster, k -> new ArrayList<>());
-            Double rate = computeRate(e.getValue());
-            values.add(Objects.requireNonNullElse(rate, 0.0));
-        }
-        List<String> sortedKeys = bucketToRates.keySet().stream().sorted().toList();
-        try (var resp = run("METRICS hosts sum(rate(request_count)) BY cluster | SORT cluster")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("cluster", "keyword", null))
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(bucketToRates.size()));
-            for (int i = 0; i < bucketToRates.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(2));
-                String key = sortedKeys.get(i);
-                assertThat(row.get(1), equalTo(key));
-                assertThat((double) row.get(0), closeTo(bucketToRates.get(key).stream().mapToDouble(d -> d).sum(), 0.1));
-            }
-        }
-        try (var resp = run("METRICS hosts avg(rate(request_count)) BY cluster | SORT cluster")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("cluster", "keyword", null))
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(bucketToRates.size()));
-            for (int i = 0; i < bucketToRates.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(2));
-                String key = sortedKeys.get(i);
-                assertThat(row.get(1), equalTo(key));
-                List<Double> rates = bucketToRates.get(key);
-                if (rates.isEmpty()) {
-                    assertThat(row.get(0), equalTo(0.0));
-                } else {
-                    double avg = rates.stream().mapToDouble(d -> d).sum() / rates.size();
-                    assertThat((double) row.get(0), closeTo(avg, 0.1));
-                }
-            }
-        }
-        try (var resp = run("METRICS hosts avg(rate(request_count, 1minute)), avg(rate(request_count)) BY cluster | SORT cluster")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("avg(rate(request_count, 1minute))", "double", null),
-                        new ColumnInfoImpl("avg(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("cluster", "keyword", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(bucketToRates.size()));
-            for (int i = 0; i < bucketToRates.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(3));
-                String key = sortedKeys.get(i);
-                assertThat(row.get(2), equalTo(key));
-                List<Double> rates = bucketToRates.get(key);
-                if (rates.isEmpty()) {
-                    assertThat(row.get(0), equalTo(0.0));
-                    assertThat(row.get(1), equalTo(0.0));
-                } else {
-                    double avg = rates.stream().mapToDouble(d -> d).sum() / rates.size();
-                    assertThat((double) row.get(0), closeTo(avg * 60.0f, 0.1));
-                    assertThat((double) row.get(1), closeTo(avg, 0.1));
-                }
-            }
-        }
-    }
-
-    public void testRateWithTimeBucket() {
-        var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown();
-        record RateKey(String host, String cluster, long interval) {}
-        Map<RateKey, List<RequestCounter>> groups = new HashMap<>();
-        for (Doc doc : docs) {
-            RateKey key = new RateKey(doc.host, doc.cluster, rounding.round(doc.timestamp));
-            groups.computeIfAbsent(key, k -> new ArrayList<>()).add(new RequestCounter(doc.timestamp, doc.requestCount));
-        }
-        Map<Long, List<Double>> bucketToRates = new HashMap<>();
-        for (Map.Entry<RateKey, List<RequestCounter>> e : groups.entrySet()) {
-            List<Double> values = bucketToRates.computeIfAbsent(e.getKey().interval, k -> new ArrayList<>());
-            Double rate = computeRate(e.getValue());
-            if (rate != null) {
-                values.add(rate);
-            }
-        }
-        List<Long> sortedKeys = bucketToRates.keySet().stream().sorted().limit(5).toList();
-        try (var resp = run("METRICS hosts sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute) | SORT ts | LIMIT 5")) {
-            assertThat(
-                resp.columns(),
-                equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null)))
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(sortedKeys.size()));
-            for (int i = 0; i < sortedKeys.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(2));
-                long key = sortedKeys.get(i);
-                assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key)));
-                List<Double> bucketValues = bucketToRates.get(key);
-                if (bucketValues.isEmpty()) {
-                    assertNull(row.get(0));
-                } else {
-                    assertThat((double) row.get(0), closeTo(bucketValues.stream().mapToDouble(d -> d).sum(), 0.1));
-                }
-            }
-        }
-        try (var resp = run("METRICS hosts avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) | SORT ts | LIMIT 5")) {
-            assertThat(
-                resp.columns(),
-                equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null)))
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(sortedKeys.size()));
-            for (int i = 0; i < sortedKeys.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(2));
-                long key = sortedKeys.get(i);
-                assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key)));
-                List<Double> bucketValues = bucketToRates.get(key);
-                if (bucketValues.isEmpty()) {
-                    assertNull(row.get(0));
-                } else {
-                    double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size();
-                    assertThat((double) row.get(0), closeTo(avg, 0.1));
-                }
-            }
-        }
-        try (var resp = run("""
-            METRICS hosts avg(rate(request_count, 1minute)), avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute)
-            | SORT ts
-            | LIMIT 5
-            """)) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("avg(rate(request_count, 1minute))", "double", null),
-                        new ColumnInfoImpl("avg(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("ts", "date", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(sortedKeys.size()));
-            for (int i = 0; i < sortedKeys.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(3));
-                long key = sortedKeys.get(i);
-                assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key)));
-                List<Double> bucketValues = bucketToRates.get(key);
-                if (bucketValues.isEmpty()) {
-                    assertNull(row.get(0));
-                    assertNull(row.get(1));
-                } else {
-                    double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size();
-                    assertThat((double) row.get(0), closeTo(avg * 60.0f, 0.1));
-                    assertThat((double) row.get(1), closeTo(avg, 0.1));
-                }
-            }
-        }
-    }
-
-    public void testRateWithTimeBucketAndCluster() {
-        var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown();
-        record RateKey(String host, String cluster, long interval) {}
-        Map<RateKey, List<RequestCounter>> groups = new HashMap<>();
-        for (Doc doc : docs) {
-            RateKey key = new RateKey(doc.host, doc.cluster, rounding.round(doc.timestamp));
-            groups.computeIfAbsent(key, k -> new ArrayList<>()).add(new RequestCounter(doc.timestamp, doc.requestCount));
-        }
-        record GroupKey(String cluster, long interval) {}
-        Map<GroupKey, List<Double>> rateBuckets = new HashMap<>();
-        for (Map.Entry<RateKey, List<RequestCounter>> e : groups.entrySet()) {
-            RateKey key = e.getKey();
-            List<Double> values = rateBuckets.computeIfAbsent(new GroupKey(key.cluster, key.interval), k -> new ArrayList<>());
-            Double rate = computeRate(e.getValue());
-            if (rate != null) {
-                values.add(rate);
-            }
-        }
-        Map<GroupKey, List<Double>> cpuBuckets = new HashMap<>();
-        for (Doc doc : docs) {
-            GroupKey key = new GroupKey(doc.cluster, rounding.round(doc.timestamp));
-            cpuBuckets.computeIfAbsent(key, k -> new ArrayList<>()).add(doc.cpu);
-        }
-        List<GroupKey> sortedKeys = rateBuckets.keySet()
-            .stream()
-            .sorted(Comparator.comparing(GroupKey::interval).thenComparing(GroupKey::cluster))
-            .limit(5)
-            .toList();
-        try (var resp = run("""
-            METRICS hosts sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute), cluster
-            | SORT ts, cluster
-            | LIMIT 5""")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("sum(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("ts", "date", null),
-                        new ColumnInfoImpl("cluster", "keyword", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(sortedKeys.size()));
-            for (int i = 0; i < sortedKeys.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(3));
-                var key = sortedKeys.get(i);
-                assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval)));
-                assertThat(row.get(2), equalTo(key.cluster));
-                List<Double> bucketValues = rateBuckets.get(key);
-                if (bucketValues.isEmpty()) {
-                    assertNull(row.get(0));
-                } else {
-                    assertThat((double) row.get(0), closeTo(bucketValues.stream().mapToDouble(d -> d).sum(), 0.1));
-                }
-            }
-        }
-        try (var resp = run("""
-            METRICS hosts avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute), cluster
-            | SORT ts, cluster
-            | LIMIT 5""")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("avg(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("ts", "date", null),
-                        new ColumnInfoImpl("cluster", "keyword", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(sortedKeys.size()));
-            for (int i = 0; i < sortedKeys.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(3));
-                var key = sortedKeys.get(i);
-                assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval)));
-                assertThat(row.get(2), equalTo(key.cluster));
-                List<Double> bucketValues = rateBuckets.get(key);
-                if (bucketValues.isEmpty()) {
-                    assertNull(row.get(0));
-                } else {
-                    double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size();
-                    assertThat((double) row.get(0), closeTo(avg, 0.1));
-                }
-            }
-        }
-        try (var resp = run("""
-            METRICS hosts avg(rate(request_count, 1minute)), avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute), cluster
-            | SORT ts, cluster
-            | LIMIT 5""")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("avg(rate(request_count, 1minute))", "double", null),
-                        new ColumnInfoImpl("avg(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("ts", "date", null),
-                        new ColumnInfoImpl("cluster", "keyword", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(sortedKeys.size()));
-            for (int i = 0; i < sortedKeys.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(4));
-                var key = sortedKeys.get(i);
-                assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval)));
-                assertThat(row.get(3), equalTo(key.cluster));
-                List<Double> bucketValues = rateBuckets.get(key);
-                if (bucketValues.isEmpty()) {
-                    assertNull(row.get(0));
-                    assertNull(row.get(1));
-                } else {
-                    double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size();
-                    assertThat((double) row.get(0), closeTo(avg * 60.0f, 0.1));
-                    assertThat((double) row.get(1), closeTo(avg, 0.1));
-                }
-            }
-        }
-        try (var resp = run("""
-            METRICS hosts
-                    s = sum(rate(request_count)),
-                    c = count(rate(request_count)),
-                    max(rate(request_count)),
-                    avg(rate(request_count))
-            BY ts=bucket(@timestamp, 1minute), cluster
-            | SORT ts, cluster
-            | LIMIT 5
-            | EVAL avg_rate= s/c
-            | KEEP avg_rate, `max(rate(request_count))`, `avg(rate(request_count))`, ts, cluster
-            """)) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("avg_rate", "double", null),
-                        new ColumnInfoImpl("max(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("avg(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("ts", "date", null),
-                        new ColumnInfoImpl("cluster", "keyword", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(sortedKeys.size()));
-            for (int i = 0; i < sortedKeys.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(5));
-                var key = sortedKeys.get(i);
-                assertThat(row.get(3), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval)));
-                assertThat(row.get(4), equalTo(key.cluster));
-                List<Double> bucketValues = rateBuckets.get(key);
-                if (bucketValues.isEmpty()) {
-                    assertNull(row.get(0));
-                    assertNull(row.get(1));
-                } else {
-                    double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size();
-                    assertThat((double) row.get(0), closeTo(avg, 0.1));
-                    double max = bucketValues.stream().mapToDouble(d -> d).max().orElse(0.0);
-                    assertThat((double) row.get(1), closeTo(max, 0.1));
-                }
-                assertEquals(row.get(0), row.get(2));
-            }
-        }
-        try (var resp = run("""
-            METRICS hosts sum(rate(request_count)), max(cpu) BY ts=bucket(@timestamp, 1 minute), cluster
-            | SORT ts, cluster
-            | LIMIT 5""")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("sum(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("max(cpu)", "double", null),
-                        new ColumnInfoImpl("ts", "date", null),
-                        new ColumnInfoImpl("cluster", "keyword", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(sortedKeys.size()));
-            for (int i = 0; i < sortedKeys.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(4));
-                var key = sortedKeys.get(i);
-                assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval)));
-                assertThat(row.get(3), equalTo(key.cluster));
-                List<Double> rateBucket = rateBuckets.get(key);
-                if (rateBucket.isEmpty()) {
-                    assertNull(row.get(0));
-                } else {
-                    assertThat((double) row.get(0), closeTo(rateBucket.stream().mapToDouble(d -> d).sum(), 0.1));
-                }
-                List<Double> cpuBucket = cpuBuckets.get(key);
-                if (cpuBuckets.isEmpty()) {
-                    assertNull(row.get(1));
-                } else {
-                    assertThat((double) row.get(1), closeTo(cpuBucket.stream().mapToDouble(d -> d).max().orElse(0.0), 0.1));
-                }
-            }
-        }
-        try (var resp = run("""
-            METRICS hosts sum(rate(request_count)), avg(cpu) BY ts=bucket(@timestamp, 1 minute), cluster
-            | SORT ts, cluster
-            | LIMIT 5""")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl("sum(rate(request_count))", "double", null),
-                        new ColumnInfoImpl("avg(cpu)", "double", null),
-                        new ColumnInfoImpl("ts", "date", null),
-                        new ColumnInfoImpl("cluster", "keyword", null)
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(sortedKeys.size()));
-            for (int i = 0; i < sortedKeys.size(); i++) {
-                List<Object> row = values.get(i);
-                assertThat(row, hasSize(4));
-                var key = sortedKeys.get(i);
-                assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval)));
-                assertThat(row.get(3), equalTo(key.cluster));
-                List<Double> rateBucket = rateBuckets.get(key);
-                if (rateBucket.isEmpty()) {
-                    assertNull(row.get(0));
-                } else {
-                    assertThat((double) row.get(0), closeTo(rateBucket.stream().mapToDouble(d -> d).sum(), 0.1));
-                }
-                List<Double> cpuBucket = cpuBuckets.get(key);
-                if (cpuBuckets.isEmpty()) {
-                    assertNull(row.get(1));
-                } else {
-                    double avg = cpuBucket.stream().mapToDouble(d -> d).sum() / cpuBucket.size();
-                    assertThat((double) row.get(1), closeTo(avg, 0.1));
-                }
-            }
-        }
-    }
-
-    public void testApplyRateBeforeFinalGrouping() {
-        record RateKey(String cluster, String host) {
-
-        }
-        Map<RateKey, List<RequestCounter>> groups = new HashMap<>();
-        for (Doc doc : docs) {
-            RateKey key = new RateKey(doc.cluster, doc.host);
-            groups.computeIfAbsent(key, k -> new ArrayList<>()).add(new RequestCounter(doc.timestamp, doc.requestCount));
-        }
-        List<Double> rates = new ArrayList<>();
-        for (List<RequestCounter> group : groups.values()) {
-            Double v = computeRate(group);
-            if (v != null) {
-                rates.add(v);
-            }
-        }
-        try (var resp = run("METRICS hosts sum(abs(rate(request_count, 1second)))")) {
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(abs(rate(request_count, 1second)))", "double", null))));
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(1));
-            assertThat(values.get(0), hasSize(1));
-            assertThat((double) values.get(0).get(0), closeTo(rates.stream().mapToDouble(d -> d).sum(), 0.1));
-        }
-        try (var resp = run("METRICS hosts sum(10.0 * rate(request_count, 1second))")) {
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(10.0 * rate(request_count, 1second))", "double", null))));
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(1));
-            assertThat(values.get(0), hasSize(1));
-            assertThat((double) values.get(0).get(0), closeTo(rates.stream().mapToDouble(d -> d * 10.0).sum(), 0.1));
-        }
-        try (var resp = run("METRICS hosts sum(20 * rate(request_count, 1second) + 10 * floor(rate(request_count, 1second)))")) {
-            assertThat(
-                resp.columns(),
-                equalTo(
-                    List.of(
-                        new ColumnInfoImpl(
-                            "sum(20 * rate(request_count, 1second) + 10 * floor(rate(request_count, 1second)))",
-                            "double",
-                            null
-                        )
-                    )
-                )
-            );
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(1));
-            assertThat(values.get(0), hasSize(1));
-            assertThat((double) values.get(0).get(0), closeTo(rates.stream().mapToDouble(d -> 20. * d + 10.0 * Math.floor(d)).sum(), 0.1));
-        }
-    }
-
-    public void testIndexMode() {
-        createIndex("events");
-        int numDocs = between(1, 10);
-        for (int i = 0; i < numDocs; i++) {
-            index("events", Integer.toString(i), Map.of("v", i));
-        }
-        refresh("events");
-        List<ColumnInfoImpl> columns = List.of(
-            new ColumnInfoImpl("_index", DataType.KEYWORD, null),
-            new ColumnInfoImpl("_index_mode", DataType.KEYWORD, null)
-        );
-        try (EsqlQueryResponse resp = run("""
-            FROM events,hosts METADATA _index_mode, _index
-            | WHERE _index_mode == "time_series"
-            | STATS BY _index, _index_mode
-            """)) {
-            assertThat(resp.columns(), equalTo(columns));
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(1));
-            assertThat(values, equalTo(List.of(List.of("hosts", "time_series"))));
-        }
-        try (EsqlQueryResponse resp = run("""
-            FROM events,hosts METADATA _index_mode, _index
-            | WHERE _index_mode == "standard"
-            | STATS BY _index, _index_mode
-            """)) {
-            assertThat(resp.columns(), equalTo(columns));
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(1));
-            assertThat(values, equalTo(List.of(List.of("events", "standard"))));
-        }
-        try (EsqlQueryResponse resp = run("""
-            FROM events,hosts METADATA _index_mode, _index
-            | STATS BY _index, _index_mode
-            | SORT _index
-            """)) {
-            assertThat(resp.columns(), equalTo(columns));
-            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
-            assertThat(values, hasSize(2));
-            assertThat(values, equalTo(List.of(List.of("events", "standard"), List.of("hosts", "time_series"))));
-        }
-    }
-}

+ 1 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -23,7 +23,6 @@ import org.elasticsearch.compute.lucene.LuceneOperator;
 import org.elasticsearch.compute.lucene.LuceneSliceQueue;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator;
-import org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorFactory;
 import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
@@ -213,13 +212,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
             );
         } else {
             if (esQueryExec.indexMode() == IndexMode.TIME_SERIES) {
-                luceneFactory = TimeSeriesSortedSourceOperatorFactory.create(
-                    limit,
-                    context.pageSize(rowEstimatedSize),
-                    context.queryPragmas().taskConcurrency(),
-                    shardContexts,
-                    querySupplier(esQueryExec.query())
-                );
+                throw new IllegalStateException("time-series in ES|QL is not supported");
             } else {
                 luceneFactory = new LuceneSourceOperator.Factory(
                     shardContexts,