Browse Source

Add support for in index order execution mode to aggregations (#82129)

Adds a check for presence of time-series based aggregations and switches the
aggregation framework into in index order execution mode. Introduces the basic 
time_series aggregation that triggers that mode.

Relates to #74660
Igor Motov 3 years ago
parent
commit
d9589cb312
33 changed files with 1870 additions and 34 deletions
  1. 5 0
      benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java
  2. 3 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
  3. 81 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/450_time_series.yml
  4. 522 0
      server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/TimeSeriesAggregationsIT.java
  5. 30 16
      server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java
  6. 3 2
      server/src/main/java/org/elasticsearch/search/DocValueFormat.java
  7. 13 0
      server/src/main/java/org/elasticsearch/search/SearchModule.java
  8. 2 1
      server/src/main/java/org/elasticsearch/search/SearchService.java
  9. 12 0
      server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java
  10. 9 0
      server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java
  11. 16 4
      server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java
  12. 12 0
      server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java
  13. 3 0
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregatorFactory.java
  14. 4 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java
  15. 4 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java
  16. 4 0
      server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregatorFactory.java
  17. 19 1
      server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java
  18. 267 0
      server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java
  19. 84 0
      server/src/main/java/org/elasticsearch/search/aggregations/timeseries/ParsedTimeSeries.java
  20. 29 0
      server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeries.java
  21. 133 0
      server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationBuilder.java
  22. 41 0
      server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationFactory.java
  23. 123 0
      server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java
  24. 123 0
      server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java
  25. 3 1
      server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java
  26. 89 0
      server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java
  27. 59 0
      server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationBuilderTests.java
  28. 126 0
      server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregatorTests.java
  29. 5 0
      test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java
  30. 40 6
      test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java
  31. 3 0
      test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java
  32. 1 0
      x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java
  33. 2 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java

+ 5 - 0
benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java

@@ -346,6 +346,11 @@ public class AggConstructionContentionBenchmark {
             return true;
         }
 
+        @Override
+        public boolean isInSortOrderExecutionRequired() {
+            return false;
+        }
+
         @Override
         public void close() {
             List<Releasable> releaseMe = new ArrayList<>(this.releaseMe);

+ 3 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

@@ -187,6 +187,8 @@ import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue;
 import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket;
 import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
 import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
+import org.elasticsearch.search.aggregations.timeseries.ParsedTimeSeries;
+import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder;
 import org.elasticsearch.search.suggest.Suggest;
 import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
 import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
@@ -2893,6 +2895,7 @@ public class RestHighLevelClient implements Closeable {
         map.put(StringStatsAggregationBuilder.NAME, (p, c) -> ParsedStringStats.PARSER.parse(p, (String) c));
         map.put(TopMetricsAggregationBuilder.NAME, (p, c) -> ParsedTopMetrics.PARSER.parse(p, (String) c));
         map.put(InferencePipelineAggregationBuilder.NAME, (p, c) -> ParsedInference.fromXContent(p, (String) (c)));
+        map.put(TimeSeriesAggregationBuilder.NAME, (p, c) -> ParsedTimeSeries.fromXContent(p, (String) (c)));
         List<NamedXContentRegistry.Entry> entries = map.entrySet()
             .stream()
             .map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue()))

+ 81 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/450_time_series.yml

@@ -0,0 +1,81 @@
+setup:
+  - skip:
+        version: " - 8.1.0"
+        reason: Suipport for time_series aggs was added in 8.1.0
+
+  - do:
+      indices.create:
+        index: tsdb
+        body:
+          settings:
+            number_of_replicas: 0
+            mode: time_series
+            routing_path: [key]
+            time_series:
+              start_time: "2021-01-01T00:00:00Z"
+              end_time: "2022-01-01T00:00:00Z"
+          mappings:
+            properties:
+              key:
+                type: keyword
+                time_series_dimension: true
+              "@timestamp":
+                type: date
+
+  - do:
+      cluster.health:
+        wait_for_status: green
+
+  - do:
+      bulk:
+        index: tsdb
+        refresh: true
+        body:
+          - '{ "index": {} }'
+          - '{ "key": "bar", "val": 2, "@timestamp": "2021-01-01T00:00:10Z" }'
+          - '{ "index": {}}'
+          - '{ "key": "bar", "val": 10, "@timestamp": "2021-01-01T00:00:00Z" }'
+          - '{ "index": {}}'
+          - '{ "key": "bar", "val": 50, "@timestamp": "2021-01-01T00:00:30Z" }'
+          - '{ "index": {}}'
+          - '{ "key": "bar", "val": 40, "@timestamp": "2021-01-01T00:00:20Z" }'
+
+  # Let's try to create another segment to make things a bit more interesting
+  - do:
+      bulk:
+        index: tsdb
+        refresh: true
+        body:
+          - '{ "index": {} }'
+          - '{ "key": "foo", "val": 20, "@timestamp": "2021-01-01T00:00:00Z" }'
+          - '{ "create": {} }'
+          - '{ "key": "foo", "val": 30, "@timestamp": "2021-01-01T00:10:00Z" }'
+          - '{ "index": {} }'
+          - '{ "key": "baz", "val": 20, "@timestamp": "2021-01-01T00:00:00Z" }'
+          - '{ "index": {} }'
+          - '{ "key": "baz", "val": 20, "@timestamp": "2021-01-01T00:00:00" }'
+
+---
+"Basic test":
+  - do:
+      search:
+        index: tsdb
+        body:
+          query:
+            range:
+              "@timestamp":
+                gte: "2021-01-01T00:10:00Z"
+          size: 0
+          aggs:
+            ts:
+              time_series:
+                keyed: false
+
+
+
+  - match: { hits.total.value: 1 }
+  - length: { aggregations.ts.buckets: 1 }
+
+  - match: { aggregations.ts.buckets.0.key: { "key": "foo" } }
+  - match: { aggregations.ts.buckets.0.doc_count: 1 }
+

+ 522 - 0
server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/TimeSeriesAggregationsIT.java

@@ -0,0 +1,522 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.alias.Alias;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
+import org.elasticsearch.search.aggregations.bucket.global.Global;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
+import org.elasticsearch.search.aggregations.metrics.Stats;
+import org.elasticsearch.search.aggregations.metrics.Sum;
+import org.elasticsearch.search.aggregations.pipeline.SimpleValue;
+import org.elasticsearch.search.aggregations.timeseries.TimeSeries;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentFactory;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.global;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.stats;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.timeSeries;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+@ESIntegTestCase.SuiteScopeTestCase
+public class TimeSeriesAggregationsIT extends ESIntegTestCase {
+
+    private static final Map<Map<String, String>, Map<Long, Map<String, Double>>> data = new HashMap<>();
+    private static int numberOfDimensions;
+    private static int numberOfMetrics;
+    private static String[][] dimensions;
+    private static Long[] boundaries;
+
+    @Override
+    public void setupSuiteScopeCluster() throws Exception {
+        int numberOfIndices = randomIntBetween(1, 3);
+        numberOfDimensions = randomIntBetween(1, 5);
+        numberOfMetrics = randomIntBetween(1, 10);
+        String[] routingKeys = randomSubsetOf(
+            randomIntBetween(1, numberOfDimensions),
+            IntStream.rangeClosed(0, numberOfDimensions - 1).boxed().toArray(Integer[]::new)
+        ).stream().map(k -> "dim_" + k).toArray(String[]::new);
+        dimensions = new String[numberOfDimensions][];
+        int dimCardinality = 1;
+        for (int i = 0; i < dimensions.length; i++) {
+            dimensions[i] = randomUnique(() -> randomAlphaOfLength(10), randomIntBetween(1, 30 / numberOfMetrics)).toArray(new String[0]);
+            dimCardinality *= dimensions[i].length;
+        }
+
+        XContentBuilder builder = XContentFactory.jsonBuilder();
+        builder.startObject();
+        builder.startObject("properties");
+        for (int i = 0; i < dimensions.length; i++) {
+            builder.startObject("dim_" + i);
+            builder.field("type", "keyword");
+            builder.field("time_series_dimension", true);
+            builder.endObject();
+        }
+        for (int i = 0; i < numberOfMetrics; i++) {
+            builder.startObject("metric_" + i);
+            builder.field("type", "double");
+            builder.endObject();
+        }
+        builder.endObject(); // properties
+        builder.endObject();
+        String start = "2021-01-01T00:00:00Z";
+        String end = "2022-01-01T00:00:00Z";
+        long startMillis = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis(start);
+        long endMillis = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis(end);
+        Set<Long> possibleBoundaries = randomUnique(() -> randomLongBetween(startMillis + 1, endMillis - 1), numberOfIndices - 1);
+        possibleBoundaries.add(startMillis);
+        possibleBoundaries.add(endMillis);
+        boundaries = possibleBoundaries.stream().sorted().toArray(Long[]::new);
+        for (int i = 0; i < numberOfIndices; i++) {
+            assertAcked(
+                prepareCreate("index" + i).setSettings(
+                    Settings.builder()
+                        .put("mode", "time_series")
+                        .put("routing_path", String.join(",", routingKeys))
+                        .put("index.number_of_shards", randomIntBetween(1, 10))
+                        .put("time_series.start_time", boundaries[i])
+                        .put("time_series.end_time", boundaries[i + 1])
+                        .build()
+                ).setMapping(builder).addAlias(new Alias("index")).get()
+            );
+        }
+
+        int numberOfDocs = randomIntBetween(dimCardinality, dimCardinality * 5);
+        logger.info(
+            "Dimensions: "
+                + numberOfDimensions
+                + " metrics: "
+                + numberOfMetrics
+                + " documents "
+                + numberOfDocs
+                + " cardinality "
+                + dimCardinality
+        );
+
+        List<IndexRequestBuilder> docs = new ArrayList<>(numberOfDocs);
+        for (int i = 0; i < numberOfDocs; i++) {
+            XContentBuilder docSource = XContentFactory.jsonBuilder();
+            docSource.startObject();
+            Map<String, String> key = new HashMap<>();
+            for (int d = 0; d < numberOfDimensions; d++) {
+                String dim = randomFrom(dimensions[d]);
+                docSource.field("dim_" + d, dim);
+                key.put("dim_" + d, dim);
+            }
+            Map<String, Double> metrics = new HashMap<>();
+            for (int m = 0; m < numberOfMetrics; m++) {
+                Double val = randomDoubleBetween(0.0, 10000.0, true);
+                docSource.field("metric_" + m, val);
+                metrics.put("metric_" + m, val);
+            }
+            Map<Long, Map<String, Double>> tsValues = data.get(key);
+            long timestamp;
+            if (tsValues == null) {
+                timestamp = randomLongBetween(startMillis, endMillis - 1);
+                tsValues = new HashMap<>();
+                data.put(key, tsValues);
+            } else {
+                timestamp = randomValueOtherThanMany(tsValues::containsKey, () -> randomLongBetween(startMillis, endMillis - 1));
+            }
+            tsValues.put(timestamp, metrics);
+            docSource.field("@timestamp", timestamp);
+            docSource.endObject();
+            docs.add(client().prepareIndex("index" + findIndex(timestamp)).setOpType(DocWriteRequest.OpType.CREATE).setSource(docSource));
+        }
+        indexRandom(true, false, docs);
+    }
+
+    public void testStandAloneTimeSeriesAgg() {
+        SearchResponse response = client().prepareSearch("index").setSize(0).addAggregation(timeSeries("by_ts")).get();
+        assertSearchResponse(response);
+        Aggregations aggregations = response.getAggregations();
+        assertNotNull(aggregations);
+        TimeSeries timeSeries = aggregations.get("by_ts");
+        assertThat(
+            timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()),
+            equalTo(data.keySet())
+        );
+        for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) {
+            @SuppressWarnings("unchecked")
+            Map<String, String> key = (Map<String, String>) bucket.getKey();
+            assertThat((long) data.get(key).size(), equalTo(bucket.getDocCount()));
+        }
+    }
+
+    public void testTimeSeriesGroupedByADimension() {
+        String groupBy = "dim_" + randomIntBetween(0, numberOfDimensions - 1);
+        SearchResponse response = client().prepareSearch("index")
+            .setSize(0)
+            .addAggregation(
+                terms("by_dim").field(groupBy)
+                    .size(data.size())
+                    .collectMode(randomFrom(Aggregator.SubAggCollectionMode.values()))
+                    .subAggregation(timeSeries("by_ts"))
+            )
+            .get();
+        assertSearchResponse(response);
+        Aggregations aggregations = response.getAggregations();
+        assertNotNull(aggregations);
+        Terms terms = aggregations.get("by_dim");
+        Set<Map<String, String>> keys = new HashSet<>();
+        for (Terms.Bucket term : terms.getBuckets()) {
+            TimeSeries timeSeries = term.getAggregations().get("by_ts");
+            for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) {
+                @SuppressWarnings("unchecked")
+                Map<String, String> key = (Map<String, String>) bucket.getKey();
+                assertThat((long) data.get(key).size(), equalTo(bucket.getDocCount()));
+                assertTrue("key is not unique", keys.add(key));
+                assertThat("time series doesn't contain dimensions we grouped by", key.get(groupBy), equalTo(term.getKeyAsString()));
+            }
+        }
+        assertThat(keys, equalTo(data.keySet()));
+    }
+
+    public void testTimeSeriesGroupedByDateHistogram() {
+        DateHistogramInterval fixedInterval = DateHistogramInterval.days(randomIntBetween(10, 100));
+        SearchResponse response = client().prepareSearch("index")
+            .setSize(0)
+            .addAggregation(
+                dateHistogram("by_time").field("@timestamp")
+                    .fixedInterval(fixedInterval)
+                    .subAggregation(timeSeries("by_ts").subAggregation(stats("timestamp").field("@timestamp")))
+            )
+            .get();
+        assertSearchResponse(response);
+        Aggregations aggregations = response.getAggregations();
+        assertNotNull(aggregations);
+        Histogram histogram = aggregations.get("by_time");
+        Map<Map<String, String>, Long> keys = new HashMap<>();
+        for (Histogram.Bucket interval : histogram.getBuckets()) {
+            long intervalStart = ((ZonedDateTime) interval.getKey()).toEpochSecond() * 1000;
+            long intervalEnd = intervalStart + fixedInterval.estimateMillis();
+            TimeSeries timeSeries = interval.getAggregations().get("by_ts");
+            for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) {
+                @SuppressWarnings("unchecked")
+                Map<String, String> key = (Map<String, String>) bucket.getKey();
+                keys.compute(key, (k, v) -> (v == null ? 0 : v) + bucket.getDocCount());
+                assertThat(bucket.getDocCount(), lessThanOrEqualTo((long) data.get(key).size()));
+                Stats stats = bucket.getAggregations().get("timestamp");
+                long minTimestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis(stats.getMinAsString());
+                long maxTimestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis(stats.getMaxAsString());
+                assertThat(minTimestamp, greaterThanOrEqualTo(intervalStart));
+                assertThat(maxTimestamp, lessThan(intervalEnd));
+            }
+        }
+        assertThat(keys.keySet(), equalTo(data.keySet()));
+        for (Map.Entry<Map<String, String>, Long> entry : keys.entrySet()) {
+            assertThat(entry.getValue(), equalTo((long) data.get(entry.getKey()).size()));
+        }
+    }
+
+    public void testStandAloneTimeSeriesAggWithDimFilter() {
+        boolean include = randomBoolean();
+        int dim = randomIntBetween(0, numberOfDimensions - 1);
+        String val = randomFrom(dimensions[dim]);
+        QueryBuilder queryBuilder = QueryBuilders.termQuery("dim_" + dim, val);
+        if (include == false) {
+            queryBuilder = QueryBuilders.boolQuery().mustNot(queryBuilder);
+        }
+        SearchResponse response = client().prepareSearch("index")
+            .setQuery(queryBuilder)
+            .setSize(0)
+            .addAggregation(timeSeries("by_ts"))
+            .get();
+        assertSearchResponse(response);
+        Aggregations aggregations = response.getAggregations();
+        assertNotNull(aggregations);
+        TimeSeries timeSeries = aggregations.get("by_ts");
+        Map<Map<String, String>, Map<Long, Map<String, Double>>> filteredData = dataFilteredByDimension("dim_" + dim, val, include);
+        assertThat(
+            timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()),
+            equalTo(filteredData.keySet())
+        );
+        for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) {
+            @SuppressWarnings("unchecked")
+            Map<String, String> key = (Map<String, String>) bucket.getKey();
+            assertThat(bucket.getDocCount(), equalTo((long) filteredData.get(key).size()));
+        }
+    }
+
+    public void testStandAloneTimeSeriesAggWithGlobalAggregation() {
+        boolean include = randomBoolean();
+        int dim = randomIntBetween(0, numberOfDimensions - 1);
+        int metric = randomIntBetween(0, numberOfMetrics - 1);
+        String val = randomFrom(dimensions[dim]);
+        QueryBuilder queryBuilder = QueryBuilders.termQuery("dim_" + dim, val);
+        if (include == false) {
+            queryBuilder = QueryBuilders.boolQuery().mustNot(queryBuilder);
+        }
+        SearchResponse response = client().prepareSearch("index")
+            .setQuery(queryBuilder)
+            .setSize(0)
+            .addAggregation(timeSeries("by_ts").subAggregation(sum("filter_sum").field("metric_" + metric)))
+            .addAggregation(global("everything").subAggregation(sum("all_sum").field("metric_" + metric)))
+            .addAggregation(PipelineAggregatorBuilders.sumBucket("total_filter_sum", "by_ts>filter_sum"))
+            .get();
+        assertSearchResponse(response);
+        Aggregations aggregations = response.getAggregations();
+        assertNotNull(aggregations);
+        TimeSeries timeSeries = aggregations.get("by_ts");
+        Map<Map<String, String>, Map<Long, Map<String, Double>>> filteredData = dataFilteredByDimension("dim_" + dim, val, include);
+        assertThat(
+            timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()),
+            equalTo(filteredData.keySet())
+        );
+        for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) {
+            @SuppressWarnings("unchecked")
+            Map<String, String> key = (Map<String, String>) bucket.getKey();
+            assertThat(bucket.getDocCount(), equalTo((long) filteredData.get(key).size()));
+        }
+        SimpleValue obj = aggregations.get("total_filter_sum");
+        assertThat(obj.value(), closeTo(sumByMetric(filteredData, "metric_" + metric), obj.value() * 0.0001));
+
+        Global global = aggregations.get("everything");
+        Sum allSum = global.getAggregations().get("all_sum");
+        assertThat(allSum.getValue(), closeTo(sumByMetric(data, "metric_" + metric), allSum.getValue() * 0.0001));
+
+        ElasticsearchException e = expectThrows(
+            ElasticsearchException.class,
+            () -> client().prepareSearch("index")
+                .setQuery(QueryBuilders.termQuery("dim_" + dim, val))
+                .setSize(0)
+                .addAggregation(global("everything").subAggregation(timeSeries("by_ts")))
+                .get()
+        );
+        assertThat(e.getRootCause().getMessage(), containsString("Time series aggregations cannot be used inside global aggregation."));
+    }
+
+    public void testStandAloneTimeSeriesAggWithMetricFilter() {
+        boolean above = randomBoolean();
+        int metric = randomIntBetween(0, numberOfMetrics - 1);
+        double val = randomDoubleBetween(0, 100000, true);
+        RangeQueryBuilder queryBuilder = QueryBuilders.rangeQuery("metric_" + metric);
+        if (above) {
+            queryBuilder.gt(val);
+        } else {
+            queryBuilder.lte(val);
+        }
+        SearchResponse response = client().prepareSearch("index")
+            .setQuery(queryBuilder)
+            .setSize(0)
+            .addAggregation(timeSeries("by_ts"))
+            .get();
+        assertSearchResponse(response);
+        Aggregations aggregations = response.getAggregations();
+        assertNotNull(aggregations);
+        TimeSeries timeSeries = aggregations.get("by_ts");
+        Map<Map<String, String>, Map<Long, Map<String, Double>>> filteredData = dataFilteredByMetric(data, "metric_" + metric, val, above);
+        assertThat(
+            timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()),
+            equalTo(filteredData.keySet())
+        );
+        for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) {
+            @SuppressWarnings("unchecked")
+            Map<String, String> key = (Map<String, String>) bucket.getKey();
+            assertThat(bucket.getDocCount(), equalTo((long) filteredData.get(key).size()));
+        }
+    }
+
+    public void testRetrievingHits() {
+        Map.Entry<String, Double> filterMetric = randomMetricAndValue(data);
+        double lowerVal = filterMetric.getValue() - randomDoubleBetween(0, 100000, true);
+        double upperVal = filterMetric.getValue() + randomDoubleBetween(0, 100000, true);
+        Map<Map<String, String>, Map<Long, Map<String, Double>>> filteredData = dataFilteredByMetric(
+            dataFilteredByMetric(data, filterMetric.getKey(), upperVal, false),
+            filterMetric.getKey(),
+            lowerVal,
+            true
+        );
+        QueryBuilder queryBuilder = QueryBuilders.rangeQuery(filterMetric.getKey()).gt(lowerVal).lte(upperVal);
+        int expectedSize = count(filteredData);
+        ElasticsearchException e = expectThrows(
+            ElasticsearchException.class,
+            () -> client().prepareSearch("index")
+                .setQuery(queryBuilder)
+                .setSize(expectedSize * 2)
+                .addAggregation(timeSeries("by_ts").subAggregation(topHits("hits").size(100)))
+                .addAggregation(topHits("top_hits").size(100)) // top level top hits
+                .get()
+        );
+        assertThat(e.getDetailedMessage(), containsString("Top hits aggregations cannot be used together with time series aggregations"));
+        // TODO: Fix the top hits aggregation
+    }
+
+    /**
+     * Filters the test data by only including or excluding certain results
+     * @param dimension name of the dimension to be filtered
+     * @param value name of the dimension to be filtered
+     * @param include true if all records with this dimension should be included, false otherwise
+     * @return filtered map
+     */
+    private static Map<Map<String, String>, Map<Long, Map<String, Double>>> dataFilteredByDimension(
+        String dimension,
+        String value,
+        boolean include
+    ) {
+        Map<Map<String, String>, Map<Long, Map<String, Double>>> newMap = new HashMap<>();
+        for (Map.Entry<Map<String, String>, Map<Long, Map<String, Double>>> entry : data.entrySet()) {
+            if (value.equals(entry.getKey().get(dimension)) == include) {
+                newMap.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return newMap;
+    }
+
+    /**
+     * Filters the test data by only including or excluding certain results
+     * @param data data to be filtered
+     * @param metric name of the metric the records should be filtered by
+     * @param value value of the metric
+     * @param above true if all records above the value should be included, false otherwise
+     * @return filtered map
+     */
+    private static Map<Map<String, String>, Map<Long, Map<String, Double>>> dataFilteredByMetric(
+        Map<Map<String, String>, Map<Long, Map<String, Double>>> data,
+        String metric,
+        double value,
+        boolean above
+    ) {
+        Map<Map<String, String>, Map<Long, Map<String, Double>>> newMap = new HashMap<>();
+        for (Map.Entry<Map<String, String>, Map<Long, Map<String, Double>>> entry : data.entrySet()) {
+            Map<Long, Map<String, Double>> values = new HashMap<>();
+            for (Map.Entry<Long, Map<String, Double>> doc : entry.getValue().entrySet()) {
+                Double docVal = doc.getValue().get(metric);
+                if (docVal != null && (docVal > value == above)) {
+                    values.put(doc.getKey(), doc.getValue());
+                }
+            }
+            if (values.isEmpty() == false) {
+                newMap.put(entry.getKey(), values);
+            }
+        }
+        return newMap;
+    }
+
+    private static Double sumByMetric(Map<Map<String, String>, Map<Long, Map<String, Double>>> data, String metric) {
+        final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
+        for (Map.Entry<Map<String, String>, Map<Long, Map<String, Double>>> entry : data.entrySet()) {
+            for (Map.Entry<Long, Map<String, Double>> doc : entry.getValue().entrySet()) {
+                Double docVal = doc.getValue().get(metric);
+                if (docVal != null) {
+                    kahanSummation.add(docVal);
+                }
+            }
+        }
+        return kahanSummation.value();
+    }
+
+    private static int count(Map<Map<String, String>, Map<Long, Map<String, Double>>> data) {
+        int size = 0;
+        for (Map.Entry<Map<String, String>, Map<Long, Map<String, Double>>> entry : data.entrySet()) {
+            size += entry.getValue().entrySet().size();
+        }
+        return size;
+    }
+
+    private static int findIndex(long timestamp) {
+        for (int i = 0; i < boundaries.length - 1; i++) {
+            if (timestamp < boundaries[i + 1]) {
+                return i;
+            }
+        }
+        throw new IllegalArgumentException("Cannot find index for timestamp " + timestamp);
+    }
+
+    private static Map.Entry<String, Double> randomMetricAndValue(Map<Map<String, String>, Map<Long, Map<String, Double>>> data) {
+        return randomFrom(
+            randomFrom(randomFrom(data.entrySet().stream().toList()).getValue().entrySet().stream().toList()).getValue()
+                .entrySet()
+                .stream()
+                .toList()
+        );
+    }
+
+    public void testGetHitsFailure() throws Exception {
+        assertAcked(
+            prepareCreate("test").setSettings(
+                Settings.builder()
+                    .put("mode", "time_series")
+                    .put("routing_path", "key")
+                    .put("time_series.start_time", "2021-01-01T00:00:00Z")
+                    .put("time_series.end_time", "2022-01-01T00:00:00Z")
+                    .put("number_of_shards", 1)
+                    .build()
+            ).setMapping("key", "type=keyword,time_series_dimension=true", "val", "type=double").get()
+        );
+
+        client().prepareBulk()
+            .add(client().prepareIndex("test").setId("2").setSource("key", "bar", "val", 2, "@timestamp", "2021-01-01T00:00:10Z"))
+            .add(client().prepareIndex("test").setId("1").setSource("key", "bar", "val", 10, "@timestamp", "2021-01-01T00:00:00Z"))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+        client().prepareBulk()
+            .add(client().prepareIndex("test").setId("4").setSource("key", "bar", "val", 50, "@timestamp", "2021-01-01T00:00:30Z"))
+            .add(client().prepareIndex("test").setId("3").setSource("key", "bar", "val", 40, "@timestamp", "2021-01-01T00:00:20Z"))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+        client().prepareBulk()
+            .add(client().prepareIndex("test").setId("7").setSource("key", "foo", "val", 20, "@timestamp", "2021-01-01T00:00:00Z"))
+            .add(client().prepareIndex("test").setId("8").setSource("key", "foo", "val", 30, "@timestamp", "2021-01-01T00:10:00Z"))
+            .add(client().prepareIndex("test").setId("5").setSource("key", "baz", "val", 20, "@timestamp", "2021-01-01T00:00:00Z"))
+            .add(client().prepareIndex("test").setId("6").setSource("key", "baz", "val", 30, "@timestamp", "2021-01-01T00:10:00Z"))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+
+        QueryBuilder queryBuilder = QueryBuilders.rangeQuery("@timestamp").lte("2021-01-01T00:10:00Z");
+        SearchResponse response = client().prepareSearch("test")
+            .setQuery(queryBuilder)
+            .setSize(10)
+            .addSort("key", SortOrder.ASC)
+            .addSort("@timestamp", SortOrder.ASC)
+            .get();
+        assertSearchResponse(response);
+
+        response = client().prepareSearch("test").setQuery(queryBuilder).setSize(10).addAggregation(timeSeries("by_ts")).get();
+        assertSearchResponse(response);
+
+    }
+
+}

+ 30 - 16
server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java

@@ -12,9 +12,11 @@ import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.util.ByteBlockPool;
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.fielddata.FieldData;
 import org.elasticsearch.index.fielddata.IndexFieldData;
@@ -144,27 +146,13 @@ public class TimeSeriesIdFieldMapper extends MetadataFieldMapper {
         context.doc().add(new SortedSetDocValuesField(fieldType().name(), timeSeriesId.toBytesRef()));
     }
 
-    public static BytesReference buildTsidField(Map<String, BytesReference> dimensionFields) throws IOException {
+    public static BytesReference buildTsidField(SortedMap<String, BytesReference> dimensionFields) throws IOException {
         if (dimensionFields == null || dimensionFields.isEmpty()) {
             throw new IllegalArgumentException("Dimension fields are missing.");
         }
 
         try (BytesStreamOutput out = new BytesStreamOutput()) {
-            out.writeVInt(dimensionFields.size());
-            for (Map.Entry<String, BytesReference> entry : dimensionFields.entrySet()) {
-                String fieldName = entry.getKey();
-                BytesRef fieldNameBytes = new BytesRef(fieldName);
-                int len = fieldNameBytes.length;
-                if (len > DIMENSION_NAME_LIMIT) {
-                    throw new IllegalArgumentException(
-                        "Dimension name must be less than [" + DIMENSION_NAME_LIMIT + "] bytes but [" + fieldName + "] was [" + len + "]."
-                    );
-                }
-                // Write field name in utf-8 instead of writeString's utf-16-ish thing
-                out.writeBytesRef(fieldNameBytes);
-                entry.getValue().writeTo(out);
-            }
-
+            encodeTsid(out, dimensionFields);
             BytesReference timeSeriesId = out.bytes();
             if (timeSeriesId.length() > LIMIT) {
                 throw new IllegalArgumentException(NAME + " longer than [" + LIMIT + "] bytes [" + timeSeriesId.length() + "].");
@@ -178,6 +166,24 @@ public class TimeSeriesIdFieldMapper extends MetadataFieldMapper {
         return CONTENT_TYPE;
     }
 
+    public static void encodeTsid(StreamOutput out, SortedMap<String, BytesReference> dimensionFields) throws IOException {
+        out.writeVInt(dimensionFields.size());
+        for (Map.Entry<String, BytesReference> entry : dimensionFields.entrySet()) {
+            String fieldName = entry.getKey();
+            BytesRef fieldNameBytes = new BytesRef(fieldName);
+            int len = fieldNameBytes.length;
+            if (len > DIMENSION_NAME_LIMIT) {
+                throw new IllegalArgumentException(
+                    "Dimension name must be less than [" + DIMENSION_NAME_LIMIT + "] bytes but [" + fieldName + "] was [" + len + "]."
+                );
+            }
+            // Write field name in utf-8 instead of writeString's utf-16-ish thing
+            out.writeBytesRef(fieldNameBytes);
+            entry.getValue().writeTo(out);
+        }
+
+    }
+
     /**
      * Decode the {@code _tsid} into a human readable map.
      */
@@ -208,6 +214,14 @@ public class TimeSeriesIdFieldMapper extends MetadataFieldMapper {
         }
     }
 
+    public static Map<String, Object> decodeTsid(BytesRef bytesRef) {
+        try (StreamInput input = new BytesArray(bytesRef).streamInput()) {
+            return decodeTsid(input);
+        } catch (IOException ex) {
+            throw new IllegalArgumentException("Dimension field cannot be deserialized.", ex);
+        }
+    }
+
     public static BytesReference encodeTsidValue(String value) {
         try (BytesStreamOutput out = new BytesStreamOutput()) {
             out.write((byte) 's');

+ 3 - 2
server/src/main/java/org/elasticsearch/search/DocValueFormat.java

@@ -35,10 +35,11 @@ import java.text.ParseException;
 import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Base64;
-import java.util.LinkedHashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.function.LongSupplier;
 
 /** A formatter for values as returned by the fielddata/doc-values APIs. */
@@ -705,7 +706,7 @@ public interface DocValueFormat extends NamedWriteable {
             }
 
             Map<?, ?> m = (Map<?, ?>) value;
-            Map<String, BytesReference> dimensionFields = new LinkedHashMap<>(m.size());
+            SortedMap<String, BytesReference> dimensionFields = new TreeMap<>();
             for (Map.Entry<?, ?> entry : m.entrySet()) {
                 String k = (String) entry.getKey();
                 Object v = entry.getValue();

+ 13 - 0
server/src/main/java/org/elasticsearch/search/SearchModule.java

@@ -18,6 +18,7 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.RestApiVersion;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.BoostingQueryBuilder;
 import org.elasticsearch.index.query.CombinedFieldsQueryBuilder;
@@ -208,6 +209,8 @@ import org.elasticsearch.search.aggregations.pipeline.SerialDiffPipelineAggregat
 import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
 import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
+import org.elasticsearch.search.aggregations.timeseries.InternalTimeSeries;
+import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder;
 import org.elasticsearch.search.fetch.FetchPhase;
 import org.elasticsearch.search.fetch.FetchSubPhase;
 import org.elasticsearch.search.fetch.subphase.ExplainPhase;
@@ -633,6 +636,16 @@ public class SearchModule {
                 .setAggregatorRegistrar(CompositeAggregationBuilder::registerAggregators),
             builder
         );
+        if (IndexSettings.isTimeSeriesModeEnabled()) {
+            registerAggregation(
+                new AggregationSpec(
+                    TimeSeriesAggregationBuilder.NAME,
+                    TimeSeriesAggregationBuilder::new,
+                    TimeSeriesAggregationBuilder.PARSER
+                ).addResultReader(InternalTimeSeries::new),
+                builder
+            );
+        }
 
         if (RestApiVersion.minimumSupported() == RestApiVersion.V_7) {
             registerQuery(

+ 2 - 1
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -1215,7 +1215,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
                 context::getRelativeTimeInMillis,
                 context::isCancelled,
                 context::buildFilteredQuery,
-                enableRewriteAggsToFilterByFilter
+                enableRewriteAggsToFilterByFilter,
+                IndexSettings.isTimeSeriesModeEnabled() && source.aggregations().isInSortOrderExecutionRequired()
             );
             context.addReleasable(aggContext);
             try {

+ 12 - 0
server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java

@@ -189,4 +189,16 @@ public abstract class AggregationBuilder
     public String toString() {
         return Strings.toString(this);
     }
+
+    /**
+     * Return true if any of the child aggregations is a time-series aggregation that requires an in-order execution
+     */
+    public boolean isInSortOrderExecutionRequired() {
+        for (AggregationBuilder builder : factoriesBuilder.getAggregatorFactories()) {
+            if (builder.isInSortOrderExecutionRequired()) {
+                return true;
+            }
+        }
+        return false;
+    }
 }

+ 9 - 0
server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java

@@ -78,6 +78,7 @@ import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.ValueCount;
 import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
+import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder;
 
 import java.util.List;
 import java.util.Map;
@@ -364,4 +365,12 @@ public class AggregationBuilders {
     public static CompositeAggregationBuilder composite(String name, List<CompositeValuesSourceBuilder<?>> sources) {
         return new CompositeAggregationBuilder(name, sources);
     }
+
+    /**
+     * Create a new {@link TimeSeriesAggregationBuilder} aggregation with the given name.
+     */
+    public static TimeSeriesAggregationBuilder timeSeries(String name) {
+        return new TimeSeriesAggregationBuilder(name);
+    }
+
 }

+ 16 - 4
server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

@@ -9,6 +9,7 @@ package org.elasticsearch.search.aggregations;
 
 import org.apache.lucene.search.Collector;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.search.aggregations.timeseries.TimeSeriesIndexSearcher;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.profile.query.CollectorResult;
 import org.elasticsearch.search.profile.query.InternalProfileCollector;
@@ -37,10 +38,21 @@ public class AggregationPhase {
         } catch (IOException e) {
             throw new AggregationInitializationException("Could not initialize aggregators", e);
         }
-        Collector collector = context.getProfilers() == null
-            ? bucketCollector
-            : new InternalProfileCollector(bucketCollector, CollectorResult.REASON_AGGREGATION, List.of());
-        context.queryCollectors().put(AggregationPhase.class, collector);
+        if (context.aggregations().factories().context() != null
+            && context.aggregations().factories().context().isInSortOrderExecutionRequired()) {
+            TimeSeriesIndexSearcher searcher = new TimeSeriesIndexSearcher(context.searcher());
+            try {
+                searcher.search(context.rewrittenQuery(), bucketCollector);
+            } catch (IOException e) {
+                throw new AggregationExecutionException("Could not perform time series aggregation", e);
+            }
+            context.queryCollectors().put(AggregationPhase.class, BucketCollector.NO_OP_COLLECTOR);
+        } else {
+            Collector collector = context.getProfilers() == null
+                ? bucketCollector
+                : new InternalProfileCollector(bucketCollector, CollectorResult.REASON_AGGREGATION, List.of());
+            context.queryCollectors().put(AggregationPhase.class, collector);
+        }
     }
 
     public void execute(SearchContext context) {

+ 12 - 0
server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java

@@ -327,6 +327,18 @@ public class AggregatorFactories {
             return false;
         }
 
+        /**
+         * Return true if any of the factories can build a time-series aggregation that requires an in-order execution
+         */
+        public boolean isInSortOrderExecutionRequired() {
+            for (AggregationBuilder builder : aggregationBuilders) {
+                if (builder.isInSortOrderExecutionRequired()) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
         public Builder addAggregator(AggregationBuilder factory) {
             if (names.add(factory.name) == false) {
                 throw new IllegalArgumentException("Two sibling aggregations cannot have the same name: [" + factory.name + "]");

+ 3 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregatorFactory.java

@@ -28,6 +28,9 @@ public class GlobalAggregatorFactory extends AggregatorFactory {
         Map<String, Object> metadata
     ) throws IOException {
         super(name, context, parent, subFactories, metadata);
+        if (subFactories.isInSortOrderExecutionRequired()) {
+            throw new AggregationExecutionException("Time series aggregations cannot be used inside global aggregation.");
+        }
     }
 
     @Override

+ 4 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java

@@ -195,11 +195,14 @@ public abstract class TermsAggregator extends DeferableBucketAggregator {
         this.order = order;
         partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(b -> b.bucketOrd, this);
         this.format = format;
-        if (subAggsNeedScore() && descendsFromNestedAggregator(parent)) {
+        if ((subAggsNeedScore() && descendsFromNestedAggregator(parent)) || context.isInSortOrderExecutionRequired()) {
             /**
              * Force the execution to depth_first because we need to access the score of
              * nested documents in a sub-aggregation and we are not able to generate this score
              * while replaying deferred documents.
+             *
+             * We also force depth_first for time-series aggs executions since they need to be visited in a particular order (index
+             * sort order) which might be changed by the breadth_first execution.
              */
             this.collectMode = SubAggCollectionMode.DEPTH_FIRST;
         } else {

+ 4 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java

@@ -410,7 +410,10 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
                     .getValuesSource();
                 SortedSetDocValues values = globalOrdsValues(context, ordinalsValuesSource);
                 long maxOrd = values.getValueCount();
-                if (maxOrd > 0 && maxOrd <= MAX_ORDS_TO_TRY_FILTERS && context.enableRewriteToFilterByFilter()) {
+                if (maxOrd > 0
+                    && maxOrd <= MAX_ORDS_TO_TRY_FILTERS
+                    && context.enableRewriteToFilterByFilter()
+                    && false == context.isInSortOrderExecutionRequired()) {
                     StringTermsAggregatorFromFilters adapted = StringTermsAggregatorFromFilters.adaptIntoFiltersOrNull(
                         name,
                         factories,

+ 4 - 0
server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregatorFactory.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.search.aggregations.metrics;
 
+import org.elasticsearch.search.aggregations.AggregationExecutionException;
 import org.elasticsearch.search.aggregations.Aggregator;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.AggregatorFactory;
@@ -65,6 +66,9 @@ class TopHitsAggregatorFactory extends AggregatorFactory {
         Map<String, Object> metadata
     ) throws IOException {
         super(name, context, parent, subFactories, metadata);
+        if (context.isInSortOrderExecutionRequired()) {
+            throw new AggregationExecutionException("Top hits aggregations cannot be used together with time series aggregations");
+        }
         this.from = from;
         this.size = size;
         this.explain = explain;

+ 19 - 1
server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.search.aggregations.support;
 
 import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -282,6 +283,15 @@ public abstract class AggregationContext implements Releasable {
      */
     public abstract boolean enableRewriteToFilterByFilter();
 
+    /**
+     * Return true if any of the aggregations in this context is a time-series aggregation that requires an in-sort order execution.
+     *
+     * A side-effect of such execution is that all leaves are walked simultaneously and therefore we can no longer rely on
+     * {@link org.elasticsearch.search.aggregations.BucketCollector#getLeafCollector(LeafReaderContext)} to be called only after the
+     * previous leaf was fully collected.
+     */
+    public abstract boolean isInSortOrderExecutionRequired();
+
     /**
      * Implementation of {@linkplain AggregationContext} for production usage
      * that wraps our ubiquitous {@link SearchExecutionContext} and anything else
@@ -303,6 +313,7 @@ public abstract class AggregationContext implements Releasable {
         private final Supplier<Boolean> isCancelled;
         private final Function<Query, Query> filterQuery;
         private final boolean enableRewriteToFilterByFilter;
+        private final boolean inSortOrderExecutionRequired;
         private final AnalysisRegistry analysisRegistry;
 
         private final List<Aggregator> releaseMe = new ArrayList<>();
@@ -321,7 +332,8 @@ public abstract class AggregationContext implements Releasable {
             LongSupplier relativeTimeInMillis,
             Supplier<Boolean> isCancelled,
             Function<Query, Query> filterQuery,
-            boolean enableRewriteToFilterByFilter
+            boolean enableRewriteToFilterByFilter,
+            boolean inSortOrderExecutionRequired
         ) {
             this.analysisRegistry = analysisRegistry;
             this.context = context;
@@ -354,6 +366,7 @@ public abstract class AggregationContext implements Releasable {
             this.isCancelled = isCancelled;
             this.filterQuery = filterQuery;
             this.enableRewriteToFilterByFilter = enableRewriteToFilterByFilter;
+            this.inSortOrderExecutionRequired = inSortOrderExecutionRequired;
         }
 
         @Override
@@ -531,6 +544,11 @@ public abstract class AggregationContext implements Releasable {
             return enableRewriteToFilterByFilter;
         }
 
+        @Override
+        public boolean isInSortOrderExecutionRequired() {
+            return inSortOrderExecutionRequired;
+        }
+
         @Override
         public void close() {
             /*

+ 267 - 0
server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java

@@ -0,0 +1,267 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.timeseries;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.search.aggregations.AggregationReduceContext;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+import org.elasticsearch.xcontent.ObjectParser;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation.declareMultiBucketAggregationFields;
+
+public class InternalTimeSeries extends InternalMultiBucketAggregation<InternalTimeSeries, InternalTimeSeries.InternalBucket>
+    implements
+        TimeSeries {
+
+    private static final ObjectParser<ParsedTimeSeries, Void> PARSER = new ObjectParser<>(
+        ParsedTimeSeries.class.getSimpleName(),
+        true,
+        ParsedTimeSeries::new
+    );
+    static {
+        declareMultiBucketAggregationFields(
+            PARSER,
+            parser -> ParsedTimeSeries.ParsedBucket.fromXContent(parser, false),
+            parser -> ParsedTimeSeries.ParsedBucket.fromXContent(parser, true)
+        );
+    }
+
+    public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket implements TimeSeries.Bucket {
+        protected long bucketOrd;
+        protected final boolean keyed;
+        protected final Map<String, Object> key;
+        protected long docCount;
+        protected InternalAggregations aggregations;
+
+        public InternalBucket(Map<String, Object> key, long docCount, InternalAggregations aggregations, boolean keyed) {
+            this.key = key;
+            this.docCount = docCount;
+            this.aggregations = aggregations;
+            this.keyed = keyed;
+        }
+
+        /**
+         * Read from a stream.
+         */
+        public InternalBucket(StreamInput in, boolean keyed) throws IOException {
+            this.keyed = keyed;
+            key = in.readOrderedMap(StreamInput::readString, StreamInput::readGenericValue);
+            docCount = in.readVLong();
+            aggregations = InternalAggregations.readFrom(in);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeMap(key, StreamOutput::writeString, StreamOutput::writeGenericValue);
+            out.writeVLong(docCount);
+            aggregations.writeTo(out);
+        }
+
+        @Override
+        public Map<String, Object> getKey() {
+            return key;
+        }
+
+        @Override
+        public String getKeyAsString() {
+            return key.toString();
+        }
+
+        @Override
+        public long getDocCount() {
+            return docCount;
+        }
+
+        @Override
+        public InternalAggregations getAggregations() {
+            return aggregations;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            if (keyed) {
+                builder.startObject(getKeyAsString());
+            } else {
+                builder.startObject();
+            }
+            builder.field(CommonFields.KEY.getPreferredName(), key);
+            builder.field(CommonFields.DOC_COUNT.getPreferredName(), docCount);
+            aggregations.toXContentInternal(builder, params);
+            builder.endObject();
+            return builder;
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (other == null || getClass() != other.getClass()) {
+                return false;
+            }
+            InternalTimeSeries.InternalBucket that = (InternalTimeSeries.InternalBucket) other;
+            return Objects.equals(key, that.key)
+                && Objects.equals(keyed, that.keyed)
+                && Objects.equals(docCount, that.docCount)
+                && Objects.equals(aggregations, that.aggregations);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(getClass(), key, keyed, docCount, aggregations);
+        }
+    }
+
+    private final List<InternalTimeSeries.InternalBucket> buckets;
+    private final boolean keyed;
+    // bucketMap gets lazily initialized from buckets in getBucketByKey()
+    private transient Map<String, InternalTimeSeries.InternalBucket> bucketMap;
+
+    public InternalTimeSeries(String name, List<InternalTimeSeries.InternalBucket> buckets, boolean keyed, Map<String, Object> metadata) {
+        super(name, metadata);
+        this.buckets = buckets;
+        this.keyed = keyed;
+    }
+
+    /**
+     * Read from a stream.
+     */
+    public InternalTimeSeries(StreamInput in) throws IOException {
+        super(in);
+        keyed = in.readBoolean();
+        int size = in.readVInt();
+        List<InternalTimeSeries.InternalBucket> buckets = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            buckets.add(new InternalTimeSeries.InternalBucket(in, keyed));
+        }
+        this.buckets = buckets;
+        this.bucketMap = null;
+    }
+
+    @Override
+    public String getWriteableName() {
+        return TimeSeriesAggregationBuilder.NAME;
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        if (keyed) {
+            builder.startObject(CommonFields.BUCKETS.getPreferredName());
+        } else {
+            builder.startArray(CommonFields.BUCKETS.getPreferredName());
+        }
+        for (InternalBucket bucket : buckets) {
+            bucket.toXContent(builder, params);
+        }
+        if (keyed) {
+            builder.endObject();
+        } else {
+            builder.endArray();
+        }
+        return builder;
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        out.writeBoolean(keyed);
+        out.writeVInt(buckets.size());
+        for (InternalTimeSeries.InternalBucket bucket : buckets) {
+            bucket.writeTo(out);
+        }
+    }
+
+    @Override
+    public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
+        // We still need to reduce in case we got the same time series in 2 different indices, but we should be able to optimize
+        // that in the future
+        Map<Map<String, Object>, List<InternalBucket>> bucketsList = null;
+        for (InternalAggregation aggregation : aggregations) {
+            InternalTimeSeries timeSeries = (InternalTimeSeries) aggregation;
+            if (bucketsList != null) {
+                for (InternalBucket bucket : timeSeries.buckets) {
+                    bucketsList.compute(bucket.key, (map, list) -> {
+                        if (list == null) {
+                            list = new ArrayList<>();
+                        }
+                        list.add(bucket);
+                        return list;
+                    });
+                }
+            } else {
+                bucketsList = new HashMap<>(timeSeries.buckets.size());
+                for (InternalTimeSeries.InternalBucket bucket : timeSeries.buckets) {
+                    List<InternalBucket> bucketList = new ArrayList<>();
+                    bucketList.add(bucket);
+                    bucketsList.put(bucket.key, bucketList);
+                }
+            }
+        }
+
+        reduceContext.consumeBucketsAndMaybeBreak(bucketsList.size());
+        InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(bucketsList.size()), keyed, getMetadata());
+        for (Map.Entry<Map<String, Object>, List<InternalBucket>> bucketEntry : bucketsList.entrySet()) {
+            reduced.buckets.add(reduceBucket(bucketEntry.getValue(), reduceContext));
+        }
+        return reduced;
+
+    }
+
+    @Override
+    public InternalTimeSeries create(List<InternalBucket> buckets) {
+        return new InternalTimeSeries(name, buckets, keyed, metadata);
+    }
+
+    @Override
+    public InternalBucket createBucket(InternalAggregations aggregations, InternalBucket prototype) {
+        return new InternalBucket(prototype.key, prototype.docCount, aggregations, prototype.keyed);
+    }
+
+    @Override
+    protected InternalBucket reduceBucket(List<InternalBucket> buckets, AggregationReduceContext context) {
+        InternalTimeSeries.InternalBucket reduced = null;
+        List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
+        for (InternalTimeSeries.InternalBucket bucket : buckets) {
+            if (reduced == null) {
+                reduced = new InternalTimeSeries.InternalBucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed);
+            } else {
+                reduced.docCount += bucket.docCount;
+            }
+            aggregationsList.add(bucket.aggregations);
+        }
+        reduced.aggregations = InternalAggregations.reduce(aggregationsList, context);
+        return reduced;
+    }
+
+    @Override
+    public List<InternalBucket> getBuckets() {
+        return buckets;
+    }
+
+    @Override
+    public InternalBucket getBucketByKey(String key) {
+        if (bucketMap == null) {
+            bucketMap = new HashMap<>(buckets.size());
+            for (InternalBucket bucket : buckets) {
+                bucketMap.put(bucket.getKeyAsString(), bucket);
+            }
+        }
+        return bucketMap.get(key);
+    }
+}

+ 84 - 0
server/src/main/java/org/elasticsearch/search/aggregations/timeseries/ParsedTimeSeries.java

@@ -0,0 +1,84 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.timeseries;
+
+import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
+import org.elasticsearch.xcontent.ObjectParser;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class ParsedTimeSeries extends ParsedMultiBucketAggregation<ParsedTimeSeries.ParsedBucket> implements TimeSeries {
+
+    private transient Map<String, ParsedTimeSeries.ParsedBucket> bucketMap;
+
+    @Override
+    public String getType() {
+        return TimeSeriesAggregationBuilder.NAME;
+    }
+
+    @Override
+    public List<? extends TimeSeries.Bucket> getBuckets() {
+        return buckets;
+    }
+
+    @Override
+    public TimeSeries.Bucket getBucketByKey(String key) {
+        if (bucketMap == null) {
+            bucketMap = new HashMap<>(buckets.size());
+            for (ParsedTimeSeries.ParsedBucket bucket : buckets) {
+                bucketMap.put(bucket.getKeyAsString(), bucket);
+            }
+        }
+        return bucketMap.get(key);
+    }
+
+    private static final ObjectParser<ParsedTimeSeries, Void> PARSER = new ObjectParser<>(
+        ParsedTimeSeries.class.getSimpleName(),
+        true,
+        ParsedTimeSeries::new
+    );
+    static {
+        declareMultiBucketAggregationFields(
+            PARSER,
+            parser -> ParsedTimeSeries.ParsedBucket.fromXContent(parser, false),
+            parser -> ParsedTimeSeries.ParsedBucket.fromXContent(parser, true)
+        );
+    }
+
+    public static ParsedTimeSeries fromXContent(XContentParser parser, String name) throws IOException {
+        ParsedTimeSeries aggregation = PARSER.parse(parser, null);
+        aggregation.setName(name);
+        return aggregation;
+    }
+
+    static class ParsedBucket extends ParsedMultiBucketAggregation.ParsedBucket implements TimeSeries.Bucket {
+
+        private Map<String, Object> key;
+
+        @Override
+        public Object getKey() {
+            return key;
+        }
+
+        @Override
+        public String getKeyAsString() {
+            return key.toString();
+        }
+
+        static ParsedTimeSeries.ParsedBucket fromXContent(XContentParser parser, boolean keyed) throws IOException {
+            return parseXContent(parser, keyed, ParsedTimeSeries.ParsedBucket::new, (p, bucket) -> bucket.key = new TreeMap<>(p.map()));
+        }
+    }
+
+}

+ 29 - 0
server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeries.java

@@ -0,0 +1,29 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.timeseries;
+
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
+
+import java.util.List;
+
+public interface TimeSeries extends MultiBucketsAggregation {
+
+    /**
+     * A bucket associated with a specific time series (identified by its key)
+     */
+    interface Bucket extends MultiBucketsAggregation.Bucket {}
+
+    /**
+     * The buckets created by this aggregation.
+     */
+    @Override
+    List<? extends TimeSeries.Bucket> getBuckets();
+
+    TimeSeries.Bucket getBucketByKey(String key);
+}

+ 133 - 0
server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationBuilder.java

@@ -0,0 +1,133 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.timeseries;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.xcontent.InstantiatingObjectParser;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.ParserConstructor;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
+
+public class TimeSeriesAggregationBuilder extends AbstractAggregationBuilder<TimeSeriesAggregationBuilder> {
+    public static final String NAME = "time_series";
+    public static final ParseField KEYED_FIELD = new ParseField("keyed");
+    public static final InstantiatingObjectParser<TimeSeriesAggregationBuilder, String> PARSER;
+
+    private boolean keyed;
+
+    static {
+        InstantiatingObjectParser.Builder<TimeSeriesAggregationBuilder, String> parser = InstantiatingObjectParser.builder(
+            NAME,
+            false,
+            TimeSeriesAggregationBuilder.class
+        );
+        parser.declareBoolean(optionalConstructorArg(), KEYED_FIELD);
+        PARSER = parser.build();
+    }
+
+    public TimeSeriesAggregationBuilder(String name) {
+        this(name, true);
+    }
+
+    @ParserConstructor
+    public TimeSeriesAggregationBuilder(String name, Boolean keyed) {
+        super(name);
+        this.keyed = keyed != null ? keyed : true;
+    }
+
+    protected TimeSeriesAggregationBuilder(
+        TimeSeriesAggregationBuilder clone,
+        AggregatorFactories.Builder factoriesBuilder,
+        Map<String, Object> metadata
+    ) {
+        super(clone, factoriesBuilder, metadata);
+        this.keyed = clone.keyed;
+    }
+
+    public TimeSeriesAggregationBuilder(StreamInput in) throws IOException {
+        super(in);
+        keyed = in.readBoolean();
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        out.writeBoolean(keyed);
+    }
+
+    @Override
+    protected AggregatorFactory doBuild(
+        AggregationContext context,
+        AggregatorFactory parent,
+        AggregatorFactories.Builder subFactoriesBuilder
+    ) throws IOException {
+        return new TimeSeriesAggregationFactory(name, keyed, context, parent, subFactoriesBuilder, metadata);
+    }
+
+    @Override
+    protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(KEYED_FIELD.getPreferredName(), keyed);
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metadata) {
+        return new TimeSeriesAggregationBuilder(this, factoriesBuilder, metadata);
+    }
+
+    @Override
+    public BucketCardinality bucketCardinality() {
+        return BucketCardinality.MANY;
+    }
+
+    @Override
+    public String getType() {
+        return NAME;
+    }
+
+    @Override
+    public boolean isInSortOrderExecutionRequired() {
+        return true;
+    }
+
+    public boolean isKeyed() {
+        return keyed;
+    }
+
+    public void setKeyed(boolean keyed) {
+        this.keyed = keyed;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (super.equals(o) == false) return false;
+        TimeSeriesAggregationBuilder that = (TimeSeriesAggregationBuilder) o;
+        return keyed == that.keyed;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), keyed);
+    }
+}

+ 41 - 0
server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationFactory.java

@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.timeseries;
+
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.CardinalityUpperBound;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TimeSeriesAggregationFactory extends AggregatorFactory {
+
+    private final boolean keyed;
+
+    public TimeSeriesAggregationFactory(
+        String name,
+        boolean keyed,
+        AggregationContext context,
+        AggregatorFactory parent,
+        AggregatorFactories.Builder subFactoriesBuilder,
+        Map<String, Object> metadata
+    ) throws IOException {
+        super(name, context, parent, subFactoriesBuilder, metadata);
+        this.keyed = keyed;
+    }
+
+    @Override
+    protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata)
+        throws IOException {
+        return new TimeSeriesAggregator(name, factories, keyed, context, parent, cardinality, metadata);
+    }
+}

+ 123 - 0
server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java

@@ -0,0 +1,123 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.timeseries;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.index.fielddata.IndexFieldData;
+import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
+import org.elasticsearch.index.fielddata.plain.SortedSetBytesLeafFieldData;
+import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.CardinalityUpperBound;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
+import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
+import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class TimeSeriesAggregator extends BucketsAggregator {
+
+    private final IndexFieldData<SortedSetBytesLeafFieldData> tsidFieldData;
+    protected final BytesKeyedBucketOrds bucketOrds;
+    private final boolean keyed;
+
+    @SuppressWarnings("unchecked")
+    public TimeSeriesAggregator(
+        String name,
+        AggregatorFactories factories,
+        boolean keyed,
+        AggregationContext context,
+        Aggregator parent,
+        CardinalityUpperBound bucketCardinality,
+        Map<String, Object> metadata
+    ) throws IOException {
+        super(name, factories, context, parent, bucketCardinality, metadata);
+        this.keyed = keyed;
+        tsidFieldData = (IndexFieldData<SortedSetBytesLeafFieldData>) Objects.requireNonNull(
+            context.buildFieldContext("_tsid"),
+            "Cannot obtain tsid field"
+        ).indexFieldData();
+        bucketOrds = BytesKeyedBucketOrds.build(bigArrays(), bucketCardinality);
+    }
+
+    @Override
+    public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
+        InternalTimeSeries.InternalBucket[][] allBucketsPerOrd = new InternalTimeSeries.InternalBucket[owningBucketOrds.length][];
+        for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
+            BytesRef spareKey = new BytesRef();
+            BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
+            List<InternalTimeSeries.InternalBucket> buckets = new ArrayList<>();
+            while (ordsEnum.next()) {
+                long docCount = bucketDocCount(ordsEnum.ord());
+                ordsEnum.readValue(spareKey);
+                InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(
+                    TimeSeriesIdFieldMapper.decodeTsid(spareKey),
+                    docCount,
+                    null,
+                    keyed
+                );
+                bucket.bucketOrd = ordsEnum.ord();
+                buckets.add(bucket);
+            }
+            allBucketsPerOrd[ordIdx] = buckets.toArray(new InternalTimeSeries.InternalBucket[0]);
+        }
+        buildSubAggsForAllBuckets(allBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
+
+        InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
+        for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
+            result[ordIdx] = buildResult(allBucketsPerOrd[ordIdx]);
+        }
+        return result;
+    }
+
+    @Override
+    public InternalAggregation buildEmptyAggregation() {
+        return new InternalTimeSeries(name, new ArrayList<>(), false, metadata());
+    }
+
+    @Override
+    protected void doClose() {
+        Releasables.close(bucketOrds);
+    }
+
+    @Override
+    protected LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector sub) throws IOException {
+        final SortedBinaryDocValues tsids = tsidFieldData.load(context).getBytesValues();
+        return new LeafBucketCollectorBase(sub, null) {
+
+            @Override
+            public void collect(int doc, long bucket) throws IOException {
+                if (tsids.advanceExact(doc)) {
+                    BytesRef newTsid = tsids.nextValue();
+                    long bucketOrdinal = bucketOrds.add(bucket, newTsid);
+                    if (bucketOrdinal < 0) { // already seen
+                        bucketOrdinal = -1 - bucketOrdinal;
+                        collectExistingBucket(sub, doc, bucketOrdinal);
+                    } else {
+                        collectBucket(sub, doc, bucketOrdinal);
+                    }
+                }
+            }
+        };
+    }
+
+    InternalTimeSeries buildResult(InternalTimeSeries.InternalBucket[] topBuckets) {
+        return new InternalTimeSeries(name, List.of(topBuckets), keyed, metadata());
+    }
+}

+ 123 - 0
server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java

@@ -0,0 +1,123 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.timeseries;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.SortedNumericDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.PriorityQueue;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
+import org.elasticsearch.search.aggregations.BucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+
+import java.io.IOException;
+
+/**
+ * An IndexSearcher wrapper that executes the searches in time-series indices by traversing them by tsid and timestamp
+ * TODO: Convert it to use index sort instead of hard-coded tsid and timestamp values
+ */
+public class TimeSeriesIndexSearcher {
+
+    // We need to delegate to the other searcher here as opposed to extending IndexSearcher and inheriting default implementations as the
+    // IndexSearcher would most of the time be a ContextIndexSearcher that has important logic related to e.g. document-level security.
+    private final IndexSearcher searcher;
+
+    public TimeSeriesIndexSearcher(IndexSearcher searcher) {
+        this.searcher = searcher;
+    }
+
+    public void search(Query query, BucketCollector bucketCollector) throws IOException {
+        query = searcher.rewrite(query);
+        Weight weight = searcher.createWeight(query, bucketCollector.scoreMode(), 1);
+        PriorityQueue<LeafWalker> queue = new PriorityQueue<>(searcher.getIndexReader().leaves().size()) {
+            @Override
+            protected boolean lessThan(LeafWalker a, LeafWalker b) {
+                int res = a.tsid.compareTo(b.tsid);
+                if (res == 0) {
+                    return a.timestamp < b.timestamp;
+                } else {
+                    return res < 0;
+                }
+            }
+        };
+        for (LeafReaderContext leaf : searcher.getIndexReader().leaves()) {
+            LeafBucketCollector leafCollector = bucketCollector.getLeafCollector(leaf);
+            Scorer scorer = weight.scorer(leaf);
+            if (scorer != null) {
+                LeafWalker walker = new LeafWalker(leaf, scorer, leafCollector);
+                if (walker.next()) {
+                    queue.add(walker);
+                }
+            }
+        }
+        while (queue.top() != null) {
+            LeafWalker walker = queue.top();
+            walker.collectCurrent();
+            if (walker.next()) {
+                queue.updateTop();
+            } else {
+                queue.pop();
+            }
+        }
+    }
+
+    private static class LeafWalker {
+        private final LeafCollector collector;
+        private final Bits liveDocs;
+        private final DocIdSetIterator iterator;
+        private final SortedSetDocValues tsids;
+        private final SortedNumericDocValues timestamps;
+        final int docBase;
+        int docId;
+        BytesRef tsid;
+        long timestamp;
+
+        LeafWalker(LeafReaderContext context, Scorer scorer, LeafCollector collector) throws IOException {
+            this.collector = collector;
+            liveDocs = context.reader().getLiveDocs();
+            this.collector.setScorer(scorer);
+            iterator = scorer.iterator();
+            docBase = context.docBase;
+            tsids = context.reader().getSortedSetDocValues(TimeSeriesIdFieldMapper.NAME);
+            timestamps = context.reader().getSortedNumericDocValues(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD);
+        }
+
+        void collectCurrent() throws IOException {
+            collector.collect(docId);
+        }
+
+        boolean next() throws IOException {
+            do {
+                docId = iterator.nextDoc();
+                if (docId != DocIdSetIterator.NO_MORE_DOCS && (liveDocs == null || liveDocs.get(docId))) {
+                    if (tsids.advanceExact(docId)) {
+                        BytesRef tsid = tsids.lookupOrd(tsids.nextOrd());
+                        if (timestamps.advanceExact(docId)) {
+                            this.timestamp = timestamps.nextValue();
+                            if (tsid.equals(this.tsid) == false) {
+                                this.tsid = BytesRef.deepCopyOf(tsid);
+                            }
+                            return true;
+                        }
+                    }
+                }
+            } while (docId != DocIdSetIterator.NO_MORE_DOCS);
+            return false;
+        }
+    }
+}

+ 3 - 1
server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java

@@ -63,6 +63,7 @@ import org.elasticsearch.search.aggregations.pipeline.InternalDerivativeTests;
 import org.elasticsearch.search.aggregations.pipeline.InternalExtendedStatsBucketTests;
 import org.elasticsearch.search.aggregations.pipeline.InternalPercentilesBucketTests;
 import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests;
+import org.elasticsearch.search.aggregations.timeseries.InternalTimeSeriesTests;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.InternalAggregationTestCase;
 import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
@@ -144,7 +145,8 @@ public class AggregationsTests extends ESTestCase {
         new InternalBinaryRangeTests(),
         new InternalTopHitsTests(),
         new InternalCompositeTests(),
-        new InternalMedianAbsoluteDeviationTests()
+        new InternalMedianAbsoluteDeviationTests(),
+        new InternalTimeSeriesTests()
     );
 
     @Override

+ 89 - 0
server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java

@@ -0,0 +1,89 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.timeseries;
+
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+
+import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
+
+public class InternalTimeSeriesTests extends InternalMultiBucketAggregationTestCase<InternalTimeSeries> {
+
+    private List<InternalTimeSeries.InternalBucket> randomBuckets(boolean keyed, InternalAggregations aggregations) {
+        int numberOfBuckets = randomNumberOfBuckets();
+        List<InternalTimeSeries.InternalBucket> bucketList = new ArrayList<>(numberOfBuckets);
+        List<Map<String, Object>> keys = randomKeys(bucketKeys(randomIntBetween(1, 4)), numberOfBuckets);
+        for (int j = 0; j < numberOfBuckets; j++) {
+            long docCount = randomLongBetween(0, Long.MAX_VALUE / (20L * numberOfBuckets));
+            bucketList.add(new InternalTimeSeries.InternalBucket(keys.get(j), docCount, aggregations, keyed));
+        }
+        return bucketList;
+    }
+
+    private List<String> bucketKeys(int numberOfKeys) {
+        return randomUnique(() -> randomAlphaOfLength(10), numberOfKeys).stream().toList();
+    }
+
+    private List<Map<String, Object>> randomKeys(List<String> bucketKeys, int numberOfBuckets) {
+        List<Map<String, Object>> keys = new ArrayList<>();
+        for (int i = 0; i < numberOfBuckets; i++) {
+            keys.add(randomValueOtherThanMany(keys::contains, () -> {
+                Map<String, Object> key = new TreeMap<>();
+                for (String name : bucketKeys) {
+                    key.put(name, randomAlphaOfLength(4));
+                }
+                return key;
+            }));
+        }
+        return keys;
+    }
+
+    @Override
+    protected InternalTimeSeries createTestInstance(String name, Map<String, Object> metadata, InternalAggregations aggregations) {
+        boolean keyed = randomBoolean();
+        return new InternalTimeSeries(name, randomBuckets(keyed, aggregations), keyed, metadata);
+    }
+
+    @Override
+    protected void assertReduced(InternalTimeSeries reduced, List<InternalTimeSeries> inputs) {
+        Map<Map<String, Object>, Long> keys = new HashMap<>();
+        for (InternalTimeSeries in : inputs) {
+            for (InternalTimeSeries.InternalBucket bucket : in.getBuckets()) {
+                keys.compute(bucket.getKey(), (k, v) -> {
+                    if (v == null) {
+                        return bucket.docCount;
+                    } else {
+                        return bucket.docCount + v;
+                    }
+                });
+            }
+        }
+        assertThat(
+            reduced.getBuckets().stream().map(InternalTimeSeries.InternalBucket::getKey).toArray(Object[]::new),
+            arrayContainingInAnyOrder(keys.keySet().toArray(Object[]::new))
+        );
+    }
+
+    @Override
+    protected Class<ParsedTimeSeries> implementationClass() {
+        return ParsedTimeSeries.class;
+    }
+
+    @Override
+    protected Predicate<String> excludePathsFromXContentInsertion() {
+        return s -> s.endsWith(".key");
+    }
+}

+ 59 - 0
server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationBuilderTests.java

@@ -0,0 +1,59 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.timeseries;
+
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.test.AbstractSerializingTestCase;
+import org.elasticsearch.xcontent.NamedXContentRegistry;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.hamcrest.Matchers.hasSize;
+
+public class TimeSeriesAggregationBuilderTests extends AbstractSerializingTestCase<TimeSeriesAggregationBuilder> {
+
+    @Override
+    protected TimeSeriesAggregationBuilder doParseInstance(XContentParser parser) throws IOException {
+        assertSame(XContentParser.Token.START_OBJECT, parser.nextToken());
+        AggregatorFactories.Builder parsed = AggregatorFactories.parseAggregators(parser);
+        assertThat(parsed.getAggregatorFactories(), hasSize(1));
+        assertThat(parsed.getPipelineAggregatorFactories(), hasSize(0));
+        TimeSeriesAggregationBuilder agg = (TimeSeriesAggregationBuilder) parsed.getAggregatorFactories().iterator().next();
+        assertNull(parser.nextToken());
+        assertNotNull(agg);
+        return agg;
+    }
+
+    @Override
+    protected Writeable.Reader<TimeSeriesAggregationBuilder> instanceReader() {
+        return TimeSeriesAggregationBuilder::new;
+    }
+
+    @Override
+    protected TimeSeriesAggregationBuilder createTestInstance() {
+        return new TimeSeriesAggregationBuilder(randomAlphaOfLength(10), randomBoolean());
+    }
+
+    @Override
+    protected NamedWriteableRegistry getNamedWriteableRegistry() {
+        return new NamedWriteableRegistry(new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedWriteables());
+    }
+
+    @Override
+    protected NamedXContentRegistry xContentRegistry() {
+        return new NamedXContentRegistry(new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents());
+    }
+
+}

+ 126 - 0
server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregatorTests.java

@@ -0,0 +1,126 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.timeseries;
+
+import org.apache.lucene.document.DoubleDocValuesField;
+import org.apache.lucene.document.FloatDocValuesField;
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.apache.lucene.document.SortedSetDocValuesField;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.core.CheckedConsumer;
+import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.mapper.KeywordFieldMapper;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
+import org.elasticsearch.search.aggregations.AggregatorTestCase;
+import org.elasticsearch.search.aggregations.metrics.Sum;
+import org.elasticsearch.search.aggregations.support.ValuesSourceType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+
+import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+
+public class TimeSeriesAggregatorTests extends AggregatorTestCase {
+
+    @Override
+    protected List<ValuesSourceType> getSupportedValuesSourceTypes() {
+        return List.of();
+    }
+
+    public void testStandAloneTimeSeriesWithSum() throws IOException {
+        TimeSeriesAggregationBuilder aggregationBuilder = new TimeSeriesAggregationBuilder("ts").subAggregation(sum("sum").field("val1"));
+        long startTime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2021-01-01T00:00:00Z");
+        timeSeriesTestCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            writeTS(iw, startTime + 1, new Object[] { "dim1", "aaa", "dim2", "xxx" }, new Object[] { "val1", 1 });
+            writeTS(iw, startTime + 2, new Object[] { "dim1", "aaa", "dim2", "yyy" }, new Object[] { "val1", 2 });
+            writeTS(iw, startTime + 3, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 3 });
+            writeTS(iw, startTime + 4, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 4 });
+            writeTS(iw, startTime + 5, new Object[] { "dim1", "aaa", "dim2", "xxx" }, new Object[] { "val1", 5 });
+            writeTS(iw, startTime + 6, new Object[] { "dim1", "aaa", "dim2", "yyy" }, new Object[] { "val1", 6 });
+            writeTS(iw, startTime + 7, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 7 });
+            writeTS(iw, startTime + 8, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 8 });
+        }, ts -> {
+            assertThat(ts.getBuckets(), hasSize(3));
+
+            assertThat(ts.getBucketByKey("{dim1=aaa, dim2=xxx}").docCount, equalTo(2L));
+            assertThat(((Sum) ts.getBucketByKey("{dim1=aaa, dim2=xxx}").getAggregations().get("sum")).getValue(), equalTo(6.0));
+            assertThat(ts.getBucketByKey("{dim1=aaa, dim2=yyy}").docCount, equalTo(2L));
+            assertThat(((Sum) ts.getBucketByKey("{dim1=aaa, dim2=yyy}").getAggregations().get("sum")).getValue(), equalTo(8.0));
+            assertThat(ts.getBucketByKey("{dim1=bbb, dim2=zzz}").docCount, equalTo(4L));
+            assertThat(((Sum) ts.getBucketByKey("{dim1=bbb, dim2=zzz}").getAggregations().get("sum")).getValue(), equalTo(22.0));
+
+        },
+            new KeywordFieldMapper.KeywordFieldType("dim1"),
+            new KeywordFieldMapper.KeywordFieldType("dim2"),
+            new NumberFieldMapper.NumberFieldType("val1", NumberFieldMapper.NumberType.INTEGER)
+        );
+    }
+
+    public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {
+        final List<IndexableField> fields = new ArrayList<>();
+        fields.add(new SortedNumericDocValuesField(DataStreamTimestampFieldMapper.DEFAULT_PATH, timestamp));
+        final SortedMap<String, BytesReference> dimensionFields = new TreeMap<>();
+        for (int i = 0; i < dimensions.length; i += 2) {
+            final BytesReference reference;
+            if (dimensions[i + 1] instanceof Number) {
+                reference = TimeSeriesIdFieldMapper.encodeTsidValue(((Number) dimensions[i + 1]).longValue());
+            } else {
+                reference = TimeSeriesIdFieldMapper.encodeTsidValue(dimensions[i + 1].toString());
+            }
+            dimensionFields.put(dimensions[i].toString(), reference);
+        }
+        for (int i = 0; i < metrics.length; i += 2) {
+            if (metrics[i + 1] instanceof Integer || metrics[i + 1] instanceof Long) {
+                fields.add(new NumericDocValuesField(metrics[i].toString(), ((Number) metrics[i + 1]).longValue()));
+            } else if (metrics[i + 1] instanceof Float) {
+                fields.add(new FloatDocValuesField(metrics[i].toString(), (float) metrics[i + 1]));
+            } else if (metrics[i + 1] instanceof Double) {
+                fields.add(new DoubleDocValuesField(metrics[i].toString(), (double) metrics[i + 1]));
+            }
+        }
+        try (BytesStreamOutput out = new BytesStreamOutput()) {
+            TimeSeriesIdFieldMapper.encodeTsid(out, dimensionFields);
+            BytesReference timeSeriesId = out.bytes();
+            fields.add(new SortedSetDocValuesField(TimeSeriesIdFieldMapper.NAME, timeSeriesId.toBytesRef()));
+        }
+        // TODO: Handle metrics
+        iw.addDocument(fields.stream().toList());
+    }
+
+    private void timeSeriesTestCase(
+        TimeSeriesAggregationBuilder builder,
+        Query query,
+        CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
+        Consumer<InternalTimeSeries> verify,
+        MappedFieldType... fieldTypes
+    ) throws IOException {
+        MappedFieldType[] newFieldTypes = new MappedFieldType[fieldTypes.length + 2];
+        newFieldTypes[0] = TimeSeriesIdFieldMapper.FIELD_TYPE;
+        newFieldTypes[1] = new DateFieldMapper.DateFieldType("@timestamp");
+        System.arraycopy(fieldTypes, 0, newFieldTypes, 2, fieldTypes.length);
+
+        testCase(builder, query, buildIndex, verify, newFieldTypes);
+    }
+
+}

+ 5 - 0
test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java

@@ -512,6 +512,11 @@ public abstract class MapperServiceTestCase extends ESTestCase {
                 throw new UnsupportedOperationException();
             }
 
+            @Override
+            public boolean isInSortOrderExecutionRequired() {
+                return false;
+            }
+
             @Override
             public void close() {
                 throw new UnsupportedOperationException();

+ 40 - 6
test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

@@ -7,6 +7,7 @@
  */
 package org.elasticsearch.search.aggregations;
 
+import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.document.BinaryDocValuesField;
 import org.apache.lucene.document.Document;
@@ -20,6 +21,7 @@ import org.apache.lucene.index.CompositeReaderContext;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexReaderContext;
+import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.RandomIndexWriter;
@@ -32,6 +34,11 @@ import org.apache.lucene.search.Query;
 import org.apache.lucene.search.QueryCache;
 import org.apache.lucene.search.QueryCachingPolicy;
 import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.SortedNumericSortField;
+import org.apache.lucene.search.SortedSetSelector;
+import org.apache.lucene.search.SortedSetSortField;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.Accountable;
@@ -68,6 +75,7 @@ import org.elasticsearch.index.fielddata.IndexFieldData;
 import org.elasticsearch.index.fielddata.IndexFieldDataCache;
 import org.elasticsearch.index.mapper.BinaryFieldMapper;
 import org.elasticsearch.index.mapper.CompletionFieldMapper;
+import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.FieldAliasMapper;
 import org.elasticsearch.index.mapper.FieldMapper;
@@ -88,6 +96,7 @@ import org.elasticsearch.index.mapper.ObjectMapper;
 import org.elasticsearch.index.mapper.RangeFieldMapper;
 import org.elasticsearch.index.mapper.RangeType;
 import org.elasticsearch.index.mapper.TextFieldMapper;
+import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
 import org.elasticsearch.index.query.QueryRewriteContext;
 import org.elasticsearch.index.query.Rewriteable;
 import org.elasticsearch.index.query.SearchExecutionContext;
@@ -119,6 +128,7 @@ import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
 import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
 import org.elasticsearch.search.aggregations.support.ValuesSourceType;
+import org.elasticsearch.search.aggregations.timeseries.TimeSeriesIndexSearcher;
 import org.elasticsearch.search.fetch.FetchPhase;
 import org.elasticsearch.search.fetch.subphase.FetchDocValuesPhase;
 import org.elasticsearch.search.fetch.subphase.FetchSourcePhase;
@@ -251,6 +261,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
             new NoneCircuitBreakerService(),
             AggregationBuilder.DEFAULT_PREALLOCATION * 5, // We don't know how many bytes to preallocate so we grab a hand full
             DEFAULT_MAX_BUCKETS,
+            false,
             fieldTypes
         );
     }
@@ -268,6 +279,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
         CircuitBreakerService breakerService,
         long bytesToPreallocate,
         int maxBucket,
+        boolean isInSortOrderExecutionRequired,
         MappedFieldType... fieldTypes
     ) throws IOException {
         MappingLookup mappingLookup = MappingLookup.fromMappers(
@@ -329,7 +341,8 @@ public abstract class AggregatorTestCase extends ESTestCase {
             () -> 0L,
             () -> false,
             q -> q,
-            true
+            true,
+            isInSortOrderExecutionRequired
         );
         releasables.add(context);
         return context;
@@ -546,11 +559,14 @@ public abstract class AggregatorTestCase extends ESTestCase {
             breakerService,
             randomBoolean() ? 0 : builder.bytesToPreallocate(),
             maxBucket,
+            builder.isInSortOrderExecutionRequired(),
             fieldTypes
         );
         C root = createAggregator(builder, context);
 
-        if (splitLeavesIntoSeparateAggregators && searcher.getIndexReader().leaves().size() > 0) {
+        if (splitLeavesIntoSeparateAggregators
+            && searcher.getIndexReader().leaves().size() > 0
+            && context.isInSortOrderExecutionRequired() == false) {
             assertThat(ctx, instanceOf(CompositeReaderContext.class));
             final CompositeReaderContext compCTX = (CompositeReaderContext) ctx;
             final int size = compCTX.leaves().size();
@@ -562,14 +578,22 @@ public abstract class AggregatorTestCase extends ESTestCase {
             for (ShardSearcher subSearcher : subSearchers) {
                 C a = createAggregator(builder, context);
                 a.preCollection();
-                Weight weight = subSearcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f);
-                subSearcher.search(weight, a);
+                if (context.isInSortOrderExecutionRequired()) {
+                    new TimeSeriesIndexSearcher(subSearcher).search(rewritten, a);
+                } else {
+                    Weight weight = subSearcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f);
+                    subSearcher.search(weight, a);
+                }
                 a.postCollection();
                 aggs.add(a.buildTopLevel());
             }
         } else {
             root.preCollection();
-            searcher.search(rewritten, MultiBucketCollector.wrap(true, List.of(root)));
+            if (context.isInSortOrderExecutionRequired()) {
+                new TimeSeriesIndexSearcher(searcher).search(rewritten, MultiBucketCollector.wrap(true, List.of(root)));
+            } else {
+                searcher.search(rewritten, MultiBucketCollector.wrap(true, List.of(root)));
+            }
             root.postCollection();
             aggs.add(root.buildTopLevel());
         }
@@ -635,8 +659,17 @@ public abstract class AggregatorTestCase extends ESTestCase {
         Consumer<V> verify,
         MappedFieldType... fieldTypes
     ) throws IOException {
+        boolean timeSeries = aggregationBuilder.isInSortOrderExecutionRequired();
         try (Directory directory = newDirectory()) {
-            RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
+            IndexWriterConfig config = LuceneTestCase.newIndexWriterConfig(random(), new MockAnalyzer(random()));
+            if (timeSeries) {
+                Sort sort = new Sort(
+                    new SortedSetSortField(TimeSeriesIdFieldMapper.NAME, false, SortedSetSelector.Type.MAX),
+                    new SortedNumericSortField(DataStreamTimestampFieldMapper.DEFAULT_PATH, SortField.Type.LONG)
+                );
+                config.setIndexSort(sort);
+            }
+            RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config);
             buildIndex.accept(indexWriter);
             indexWriter.close();
 
@@ -721,6 +754,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
             breakerService,
             builder.bytesToPreallocate(),
             DEFAULT_MAX_BUCKETS,
+            builder.isInSortOrderExecutionRequired(),
             fieldTypes
         );
         Aggregator aggregator = createAggregator(builder, context);

+ 3 - 0
test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java

@@ -135,6 +135,8 @@ import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineA
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
 import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
+import org.elasticsearch.search.aggregations.timeseries.ParsedTimeSeries;
+import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder;
 import org.elasticsearch.xcontent.ContextParser;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xcontent.ParseField;
@@ -266,6 +268,7 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
         map.put(IpRangeAggregationBuilder.NAME, (p, c) -> ParsedBinaryRange.fromXContent(p, (String) c));
         map.put(TopHitsAggregationBuilder.NAME, (p, c) -> ParsedTopHits.fromXContent(p, (String) c));
         map.put(CompositeAggregationBuilder.NAME, (p, c) -> ParsedComposite.fromXContent(p, (String) c));
+        map.put(TimeSeriesAggregationBuilder.NAME, (p, c) -> ParsedTimeSeries.fromXContent(p, (String) c));
 
         namedXContents = map.entrySet()
             .stream()

+ 1 - 0
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java

@@ -349,6 +349,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
                     breaker,
                     builder.bytesToPreallocate(),
                     MultiBucketConsumerService.DEFAULT_MAX_BUCKETS,
+                    false,
                     doubleFields()
                 );
                 Aggregator aggregator = builder.build(context, null).create(null, CardinalityUpperBound.ONE);

+ 2 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java

@@ -79,7 +79,8 @@ public final class TransformAggregations {
         "t_test", // https://github.com/elastic/elasticsearch/issues/54503,
         "variable_width_histogram", // https://github.com/elastic/elasticsearch/issues/58140
         "rate", // https://github.com/elastic/elasticsearch/issues/61351
-        "multi_terms" // https://github.com/elastic/elasticsearch/issues/67609
+        "multi_terms", // https://github.com/elastic/elasticsearch/issues/67609
+        "time_series" // https://github.com/elastic/elasticsearch/issues/74660
     );
 
     private TransformAggregations() {}