Browse Source

Create a coordinating node level reader for tsdb (#79197)

* Create a coordinating node level reader for tsdb

This creates an interface that reads data in a time series compatible
way on the coordinating node. We believe that it can one day smooth out
querying time series data at a high level.

Right now there is a single implementation of this interface that
targets standard indices very inefficiently. It delegates down to our
standard `_search` APIs, specifically `composite`, `top_hits`, and
`search_after`. It is our hope that when we have fancier TSDB support we
can use it to speed the API.

The API itself looks like:
```
// The latest value for all time series in the range
void latestInRange(metric, from, to, callback);
// The latest value for all time series in ranges starting from
// `from`, st
void latestInRanges(metric, from, to, step, callback);
void valuesInRange(metric, from, to, callback);
```

* Move package
Nik Everett 4 years ago
parent
commit
bd01654aa5

+ 369 - 0
server/src/internalClusterTest/java/org/elasticsearch/timeseries/support/TimeSeriesMetricsIT.java

@@ -0,0 +1,369 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.timeseries.support;
+
+import io.github.nik9000.mapmatcher.MapMatcher;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.ListenableActionFuture;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.time.DateFormatter;
+import org.elasticsearch.core.CheckedConsumer;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+import org.elasticsearch.timeseries.support.TimeSeriesMetrics;
+import org.elasticsearch.timeseries.support.TimeSeriesMetricsService;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.json.JsonXContent;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAccessor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.IntFunction;
+
+import static io.github.nik9000.mapmatcher.MapMatcher.assertMap;
+import static io.github.nik9000.mapmatcher.MapMatcher.matchesMap;
+
+@TestLogging(value = "org.elasticsearch.search.tsdb:debug", reason = "test")
+public class TimeSeriesMetricsIT extends ESIntegTestCase {
+    private static final int MAX_RESULT_WINDOW = IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(Settings.EMPTY);
+    private static final DateFormatter FORMATTER = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
+
+    public void testKeywordDimension() throws Exception {
+        assertSmallSimple("a", "b", mapping -> mapping.field("type", "keyword"));
+    }
+
+    public void testByteDimension() throws Exception {
+        assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "byte"));
+    }
+
+    public void testShortDimension() throws Exception {
+        assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "short"));
+    }
+
+    public void testIntDimension() throws Exception {
+        assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "integer"));
+    }
+
+    public void testLongDimension() throws Exception {
+        assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "long"));
+    }
+
+    public void testIpDimension() throws Exception {
+        assertSmallSimple("192.168.0.1", "2001:db8::1:0:0:1", mapping -> mapping.field("type", "ip"));
+    }
+
+    // TODO unsigned long dimension
+
+    public void assertSmallSimple(Object d1, Object d2, CheckedConsumer<XContentBuilder, IOException> dimensionMapping) throws Exception {
+        createTsdbIndex(mapping -> {
+            mapping.startObject("dim");
+            dimensionMapping.accept(mapping);
+            mapping.field("time_series_dimension", true);
+            mapping.endObject();
+        });
+        String beforeAll = "2021-01-01T00:05:00Z";
+        String[] dates = new String[] {
+            "2021-01-01T00:10:00.000Z",
+            "2021-01-01T00:11:00.000Z",
+            "2021-01-01T00:15:00.000Z",
+            "2021-01-01T00:20:00.000Z", };
+        indexRandom(
+            true,
+            client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[0], "dim", d1, "v", 1)),
+            client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[1], "dim", d1, "v", 2)),
+            client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[2], "dim", d1, "v", 3)),
+            client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[3], "dim", d1, "v", 4)),
+            client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[1], "dim", d2, "v", 5))
+        );
+        assertMap(
+            latestInRange(between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS), beforeAll, dates[0]),
+            matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[0], 1.0)))
+        );
+        assertMap(
+            valuesInRange(between(1, MAX_RESULT_WINDOW), beforeAll, dates[0]),
+            matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[0], 1.0)))
+        );
+        assertMap(
+            latestInRange(between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS), dates[0], dates[2]),
+            matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[2], 3.0)))
+                .entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0)))
+        );
+        assertMap(
+            valuesInRange(between(1, MAX_RESULT_WINDOW), dates[0], dates[2]),
+            matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[1], 2.0), Map.entry(dates[2], 3.0)))
+                .entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0)))
+        );
+        assertMap(
+            latestInRange(between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS), beforeAll, dates[3]),
+            matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[3], 4.0)))
+                .entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0)))
+        );
+        assertMap(
+            valuesInRange(between(1, MAX_RESULT_WINDOW), beforeAll, dates[3]),
+            matchesMap().entry(
+                Map.of("dim", d1),
+                List.of(Map.entry(dates[0], 1.0), Map.entry(dates[1], 2.0), Map.entry(dates[2], 3.0), Map.entry(dates[3], 4.0))
+            ).entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0)))
+        );
+        assertMap(
+            latestInRanges(
+                between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS),
+                beforeAll,
+                dates[3],
+                new DateHistogramInterval("5m")
+            ),
+            matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[0], 1.0), Map.entry(dates[2], 3.0), Map.entry(dates[3], 4.0)))
+                .entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0)))
+        );
+    }
+
+    public void testManyTimeSeries() throws InterruptedException, ExecutionException, IOException {
+        createTsdbIndex("dim");
+        assertManyTimeSeries(i -> Map.of("dim", Integer.toString(i, Character.MAX_RADIX)));
+    }
+
+    public void testManyTimeSeriesWithManyDimensions() throws InterruptedException, ExecutionException, IOException {
+        createTsdbIndex("dim0", "dim1", "dim2", "dim3", "dim4", "dim5", "dim6", "dim7");
+        assertManyTimeSeries(i -> {
+            int dimCount = (i & 0x07) + 1;
+            Map<String, Object> dims = new HashMap<>(dimCount);
+            int offset = (i >> 3) & 0x03;
+            String value = Integer.toString(i, Character.MAX_RADIX);
+            for (int d = 0; d < dimCount; d++) {
+                dims.put("dim" + ((d + offset) & 0x07), value);
+            }
+            return dims;
+        });
+    }
+
+    private void assertManyTimeSeries(IntFunction<Map<String, Object>> gen) throws InterruptedException {
+        MapMatcher expectedLatest = matchesMap();
+        MapMatcher expectedValues = matchesMap();
+        String min = "2021-01-01T00:10:00Z";
+        String max = "2021-01-01T00:15:00Z";
+        long minMillis = FORMATTER.parseMillis(min);
+        long maxMillis = FORMATTER.parseMillis(max);
+        int iterationSize = scaledRandomIntBetween(50, 100);
+        int docCount = scaledRandomIntBetween(iterationSize * 2, iterationSize * 100);
+        List<IndexRequestBuilder> docs = new ArrayList<>(docCount);
+        for (int i = 0; i < docCount; i++) {
+            int count = randomBoolean() ? 1 : 2;
+            Set<Long> times = new TreeSet<>(); // We're using the ordered sort below
+            while (times.size() < count) {
+                times.add(randomLongBetween(minMillis + 1, maxMillis));
+            }
+            List<Map.Entry<String, Double>> expectedValuesForTimeSeries = new ArrayList<>(count);
+            Map<String, Object> dimensions = gen.apply(i);
+            String timestamp = null;
+            double value = Double.NaN;
+            for (long time : times) {
+                timestamp = FORMATTER.formatMillis(time);
+                value = randomDouble();
+                Map<String, Object> source = new HashMap<>(dimensions);
+                source.put("@timestamp", timestamp);
+                source.put("v", value);
+                if (randomBoolean()) {
+                    int garbage = between(1, 10);
+                    for (int g = 0; g < garbage; g++) {
+                        source.put("garbage" + g, randomAlphaOfLength(5));
+                    }
+                }
+                docs.add(client().prepareIndex("tsdb").setSource(source));
+                expectedValuesForTimeSeries.add(Map.entry(timestamp, value));
+            }
+            expectedLatest = expectedLatest.entry(dimensions, List.of(Map.entry(timestamp, value)));
+            expectedValues = expectedValues.entry(dimensions, expectedValuesForTimeSeries);
+        }
+        indexRandom(true, docs);
+        assertMap(latestInRange(iterationSize, min, max), expectedLatest);
+        assertMap(valuesInRange(iterationSize, min, max), expectedValues);
+    }
+
+    public void testManySteps() throws InterruptedException, ExecutionException, IOException {
+        createTsdbIndex("dim");
+        List<Map.Entry<String, Double>> expectedLatest = new ArrayList<>();
+        List<Map.Entry<String, Double>> expectedValues = new ArrayList<>();
+        String min = "2021-01-01T00:00:00Z";
+        long minMillis = FORMATTER.parseMillis(min);
+        int iterationBuckets = scaledRandomIntBetween(50, 100);
+        int bucketCount = scaledRandomIntBetween(iterationBuckets * 2, iterationBuckets * 100);
+        long maxMillis = minMillis + bucketCount * TimeUnit.SECONDS.toMillis(5);
+        String max = FORMATTER.formatMillis(maxMillis);
+        List<IndexRequestBuilder> docs = new ArrayList<>(bucketCount);
+        for (long millis = minMillis; millis < maxMillis; millis += TimeUnit.SECONDS.toMillis(5)) {
+            String timestamp = FORMATTER.formatMillis(millis);
+            double v = randomDouble();
+            if (randomBoolean()) {
+                String beforeTimestamp = FORMATTER.formatMillis(millis - 1);
+                double beforeValue = randomDouble();
+                docs.add(client().prepareIndex("tsdb").setSource(Map.of("@timestamp", beforeTimestamp, "dim", "dim", "v", beforeValue)));
+                expectedValues.add(Map.entry(beforeTimestamp, beforeValue));
+            }
+            expectedLatest.add(Map.entry(timestamp, v));
+            expectedValues.add(Map.entry(timestamp, v));
+            docs.add(client().prepareIndex("tsdb").setSource(Map.of("@timestamp", timestamp, "dim", "dim", "v", v)));
+        }
+        indexRandom(true, docs);
+        assertMap(
+            latestInRanges(iterationBuckets, "2020-01-01T00:00:00Z", max, new DateHistogramInterval("5s")),
+            matchesMap(Map.of(Map.of("dim", "dim"), expectedLatest))
+        );
+        assertMap(valuesInRange(iterationBuckets, "2020-01-01T00:00:00Z", max), matchesMap(Map.of(Map.of("dim", "dim"), expectedValues)));
+    }
+
+    private void createTsdbIndex(String... keywordDimensions) throws IOException {
+        createTsdbIndex(mapping -> {
+            for (String d : keywordDimensions) {
+                mapping.startObject(d).field("type", "keyword").field("time_series_dimension", true).endObject();
+            }
+        });
+    }
+
+    private void createTsdbIndex(CheckedConsumer<XContentBuilder, IOException> dimensionMapping) throws IOException {
+        XContentBuilder mapping = JsonXContent.contentBuilder();
+        mapping.startObject().startObject("properties");
+        mapping.startObject("@timestamp").field("type", "date").endObject();
+        mapping.startObject("v").field("type", "double").endObject();
+        dimensionMapping.accept(mapping);
+        mapping.endObject().endObject();
+        client().admin().indices().prepareCreate("tsdb").setMapping(mapping).get();
+    }
+
+    private Map<Map<String, Object>, List<Map.Entry<String, Double>>> latestInRange(int bucketBatchSize, String min, String max) {
+        TemporalAccessor minT = FORMATTER.parse(min);
+        TemporalAccessor maxT = FORMATTER.parse(max);
+        if (randomBoolean()) {
+            long days = Instant.from(maxT).until(Instant.from(minT), ChronoUnit.DAYS) + 1;
+            DateHistogramInterval step = new DateHistogramInterval(days + "d");
+            return latestInRanges(bucketBatchSize, minT, maxT, step);
+        }
+        return latestInRange(bucketBatchSize, minT, maxT);
+    }
+
+    private Map<Map<String, Object>, List<Map.Entry<String, Double>>> latestInRange(
+        int bucketBatchSize,
+        TemporalAccessor min,
+        TemporalAccessor max
+    ) {
+        return withMetrics(
+            bucketBatchSize,
+            between(0, 10000),  // Not used by this method
+            (future, metrics) -> metrics.latestInRange("v", min, max, new CollectingListener(future))
+        );
+    }
+
+    private Map<Map<String, Object>, List<Map.Entry<String, Double>>> latestInRanges(
+        int bucketBatchSize,
+        String min,
+        String max,
+        DateHistogramInterval step
+    ) {
+        return latestInRanges(bucketBatchSize, FORMATTER.parse(min), FORMATTER.parse(max), step);
+    }
+
+    private Map<Map<String, Object>, List<Map.Entry<String, Double>>> latestInRanges(
+        int bucketBatchSize,
+        TemporalAccessor min,
+        TemporalAccessor max,
+        DateHistogramInterval step
+    ) {
+        return withMetrics(
+            bucketBatchSize,
+            between(0, 10000),   // Not used by this method
+            (future, metrics) -> metrics.latestInRanges("v", min, max, step, new CollectingListener(future))
+        );
+    }
+
+    private Map<Map<String, Object>, List<Map.Entry<String, Double>>> valuesInRange(int docBatchSize, String min, String max) {
+        return valuesInRange(docBatchSize, FORMATTER.parse(min), FORMATTER.parse(max));
+    }
+
+    private Map<Map<String, Object>, List<Map.Entry<String, Double>>> valuesInRange(
+        int docBatchSize,
+        TemporalAccessor min,
+        TemporalAccessor max
+    ) {
+        return withMetrics(
+            between(0, 10000),   // Not used by this method
+            docBatchSize,
+            (future, metrics) -> metrics.valuesInRange("v", min, max, new CollectingListener(future))
+        );
+    }
+
+    private <R> R withMetrics(int bucketBatchSize, int docBatchSize, BiConsumer<ListenableActionFuture<R>, TimeSeriesMetrics> handle) {
+        ListenableActionFuture<R> result = new ListenableActionFuture<>();
+        new TimeSeriesMetricsService(client(), bucketBatchSize, docBatchSize).newMetrics(
+            new String[] { "tsdb" },
+            new ActionListener<TimeSeriesMetrics>() {
+                @Override
+                public void onResponse(TimeSeriesMetrics metrics) {
+                    handle.accept(result, metrics);
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    result.onFailure(e);
+                }
+            }
+        );
+        return result.actionGet();
+    }
+
+    private class CollectingListener implements TimeSeriesMetrics.MetricsCallback {
+        private final Map<Map<String, Object>, List<Map.Entry<String, Double>>> results = new HashMap<>();
+        private final ActionListener<Map<Map<String, Object>, List<Map.Entry<String, Double>>>> delegate;
+        private Map<String, Object> currentDimensions = null;
+        private List<Map.Entry<String, Double>> currentValues = null;
+
+        CollectingListener(ActionListener<Map<Map<String, Object>, List<Map.Entry<String, Double>>>> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public void onTimeSeriesStart(Map<String, Object> dimensions) {
+            if (currentDimensions != null) {
+                results.put(currentDimensions, currentValues);
+            }
+            currentDimensions = dimensions;
+            currentValues = new ArrayList<>();
+        }
+
+        @Override
+        public void onMetric(long time, double value) {
+            currentValues.add(Map.entry(FORMATTER.formatMillis(time), value));
+        }
+
+        @Override
+        public void onSuccess() {
+            results.put(currentDimensions, currentValues);
+            delegate.onResponse(results);
+        }
+
+        @Override
+        public void onError(Exception e) {
+            delegate.onFailure(e);
+        }
+    }
+}

+ 352 - 0
server/src/main/java/org/elasticsearch/timeseries/support/TimeSeriesMetrics.java

@@ -0,0 +1,352 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.timeseries.support;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.document.DocumentField;
+import org.elasticsearch.core.CheckedConsumer;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
+import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder;
+import org.elasticsearch.search.aggregations.bucket.composite.InternalComposite;
+import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
+import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
+import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.search.sort.SortBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.time.temporal.TemporalAccessor;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * Reads data in a time series style way.
+ */
+public class TimeSeriesMetrics {
+    private static final Logger logger = LogManager.getLogger();
+
+    private final int bucketBatchSize;
+    private final int docBatchSize;
+    private final Client client;
+    private final String[] indices;
+    private final List<String> dimensionFieldNames;
+
+    TimeSeriesMetrics(int bucketBatchSize, int docBatchSize, Client client, String[] indices, List<String> dimensionFieldNames) {
+        this.bucketBatchSize = bucketBatchSize;
+        this.docBatchSize = docBatchSize;
+        this.client = client;
+        this.indices = indices;
+        this.dimensionFieldNames = dimensionFieldNames;
+    }
+
+    /**
+     * Called when metric data is available.
+     */
+    interface MetricsCallback {
+        /**
+         * Called when starting a new time series.
+         */
+        void onTimeSeriesStart(Map<String, Object> dimensions);
+
+        /**
+         * Called for each metric returned.
+         * @param time the {@code @timestamp} recorded in the metric
+         * @param value the metric value
+         */
+        void onMetric(long time, double value);
+
+        /**
+         * Called when all requested metrics have been returned.
+         */
+        void onSuccess();
+
+        /**
+         * Called when there is any error fetching metrics. No more results
+         * will be returned.
+         */
+        void onError(Exception e);
+    }
+
+    // TODO selector
+    /**
+     * Get the latest value for all time series in the range.
+     */
+    public void latestInRange(String metric, TemporalAccessor from, TemporalAccessor to, MetricsCallback callback) {
+        latestInRanage(metric, from, to, null, null, null, callback);
+    }
+
+    // TODO selector
+    /**
+     * Get the latest value for all time series in many ranges. 
+     */
+    public void latestInRanges(
+        String metric,
+        TemporalAccessor from,
+        TemporalAccessor to,
+        DateHistogramInterval step,
+        MetricsCallback callback
+    ) {
+        latestInRanage(metric, from, to, step, null, null, callback);
+    }
+
+    /**
+     * Get the latest value for all time series in one or many ranges.
+     * @param step null if reading from a single range, the length of the range otherwise.
+     */
+    private void latestInRanage(
+        String metric,
+        TemporalAccessor from,
+        TemporalAccessor to,
+        @Nullable DateHistogramInterval step,
+        @Nullable Map<String, Object> afterKey,
+        @Nullable Map<String, Object> previousTimeSeries,
+        MetricsCallback callback
+    ) {
+        // TODO test asserting forking
+        SearchRequest search = searchInRange(from, to);
+        search.source().size(0);
+        search.source().trackTotalHits(false);
+        search.source().aggregation(timeSeriesComposite(step, afterKey).subAggregation(latestMetric(metric)));
+        logger.debug("Requesting batch of latest {}", search);
+        client.search(
+            search,
+            ActionListener.wrap(
+                new LatestInRangeResponseHandler(metric, callback, from, to, step, search, previousTimeSeries),
+                callback::onError
+            )
+        );
+    }
+
+    private SearchRequest searchInRange(TemporalAccessor from, TemporalAccessor to) {
+        SearchRequest search = new SearchRequest(indices);
+        search.source()
+            .query(
+                new RangeQueryBuilder("@timestamp").format(DateFieldMapper.DEFAULT_DATE_TIME_NANOS_FORMATTER.pattern())
+                    .gt(DateFieldMapper.DEFAULT_DATE_TIME_NANOS_FORMATTER.format(from))
+                    .lte(DateFieldMapper.DEFAULT_DATE_TIME_NANOS_FORMATTER.format(to))
+            );
+        return search;
+    }
+
+    private CompositeAggregationBuilder timeSeriesComposite(@Nullable DateHistogramInterval step, @Nullable Map<String, Object> afterKey) {
+        Stream<CompositeValuesSourceBuilder<?>> sources = dimensionFieldNames.stream()
+            .map(d -> new TermsValuesSourceBuilder(d).field(d).missingBucket(true));
+        if (step != null) {
+            sources = Stream.concat(
+                sources,
+                /*
+                 * offset(1) *includes* that last milli of the range and excludes
+                 * the first milli of the range - effectively shifting us from a
+                 * closed/open range to an open/closed range.
+                 */
+                Stream.of(new DateHistogramValuesSourceBuilder("@timestamp").field("@timestamp").fixedInterval(step).offset(1))
+            );
+        }
+        return new CompositeAggregationBuilder("time_series", sources.collect(toList())).aggregateAfter(afterKey).size(bucketBatchSize);
+    }
+
+    private TopHitsAggregationBuilder latestMetric(String metric) {
+        // TODO top metrics would almost certainly be better here but its in analytics.
+        return new TopHitsAggregationBuilder("latest").sort(new FieldSortBuilder("@timestamp").order(SortOrder.DESC))
+            .fetchField(metric)
+            .fetchField(new FieldAndFormat("@timestamp", "epoch_millis"))
+            .size(1);
+    }
+
+    /**
+     * Handler for each page of results from {@link TimeSeriesMetrics#latestInRanage}.
+     */
+    private class LatestInRangeResponseHandler implements CheckedConsumer<SearchResponse, RuntimeException> {
+        private final String metric;
+        private final MetricsCallback callback;
+        private final TemporalAccessor from;
+        private final TemporalAccessor to;
+        @Nullable
+        private final DateHistogramInterval step;
+        private final SearchRequest search;
+        private Map<String, Object> previousDimensions;
+
+        LatestInRangeResponseHandler(
+            String metric,
+            MetricsCallback callback,
+            TemporalAccessor from,
+            TemporalAccessor to,
+            @Nullable DateHistogramInterval step,
+            SearchRequest search,
+            @Nullable Map<String, Object> previousDimensions
+        ) {
+            this.metric = metric;
+            this.callback = callback;
+            this.from = from;
+            this.to = to;
+            this.step = step;
+            this.search = search;
+            this.previousDimensions = previousDimensions;
+        }
+
+        @Override
+        public void accept(SearchResponse response) {
+            // TODO shard error handling
+            InternalComposite composite = response.getAggregations().get("time_series");
+            logger.debug("Received batch of latest {} with {} buckets", search, composite.getBuckets().size());
+            for (InternalComposite.InternalBucket bucket : composite.getBuckets()) {
+                Map<String, Object> dimensions = bucket.getKey()
+                    .entrySet()
+                    .stream()
+                    .filter(e -> false == e.getKey().equals("@timestamp") && e.getValue() != null)
+                    .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
+                if (false == Objects.equals(previousDimensions, dimensions)) {
+                    previousDimensions = dimensions;
+                    callback.onTimeSeriesStart(dimensions);
+                }
+                InternalTopHits latest = bucket.getAggregations().get("latest");
+                SearchHit[] hits = latest.getHits().getHits();
+                if (hits.length == 0) {
+                    continue;
+                }
+                DocumentField metricField = hits[0].field(metric);
+                if (metricField == null) {
+                    // TODO skip in query?
+                    continue;
+                }
+                long time = Long.parseLong((String) hits[0].field("@timestamp").getValue());
+                double value = ((Number) metricField.getValue()).doubleValue();
+                callback.onMetric(time, value);
+            }
+            if (composite.afterKey() == null) {
+                callback.onSuccess();
+            } else {
+                latestInRanage(metric, from, to, step, composite.afterKey(), previousDimensions, callback);
+            }
+        }
+    }
+
+    // TODO selector
+    /**
+     * Return all values for all time series in a range.
+     */
+    public void valuesInRange(String metric, TemporalAccessor from, TemporalAccessor to, MetricsCallback listener) {
+        valuesInRange(metric, from, to, null, null, listener);
+    }
+
+    /**
+     * Search for a page of values for all time series in a range.
+     */
+    private void valuesInRange(
+        String metric,
+        TemporalAccessor from,
+        TemporalAccessor to,
+        Object[] searchAfter,
+        Map<String, Object> previousTimeSeries,
+        MetricsCallback callback
+    ) {
+        SearchRequest search = searchInRange(from, to);
+        search.source().size(docBatchSize);
+        search.source().trackTotalHits(false);
+        List<SortBuilder<?>> sorts = Stream.concat(
+            dimensionFieldNames.stream().map(d -> new FieldSortBuilder(d).order(SortOrder.ASC)),
+            Stream.of(new FieldSortBuilder("@timestamp").order(SortOrder.ASC).setFormat("epoch_millis"))
+        ).collect(toList());
+        search.source().sort(sorts);
+        if (searchAfter != null) {
+            search.source().searchAfter(searchAfter);
+        }
+        search.source().fetchField(metric);
+        client.search(
+            search,
+            ActionListener.wrap(new ValuesInRangeResponseHandler(metric, callback, from, to, search, previousTimeSeries), callback::onError)
+        );
+    }
+
+    /**
+     * Handler for {@link TimeSeriesMetrics#valuesInRange}.
+     */
+    private class ValuesInRangeResponseHandler implements CheckedConsumer<SearchResponse, RuntimeException> {
+        private final String metric;
+        private final MetricsCallback callback;
+        private final TemporalAccessor from;
+        private final TemporalAccessor to;
+        private final SearchRequest search;
+        private Map<String, Object> previousDimensions;
+
+        ValuesInRangeResponseHandler(
+            String metric,
+            MetricsCallback callback,
+            TemporalAccessor from,
+            TemporalAccessor to,
+            SearchRequest search,
+            @Nullable Map<String, Object> previousDimensions
+        ) {
+            this.metric = metric;
+            this.callback = callback;
+            this.from = from;
+            this.to = to;
+            this.search = search;
+            this.previousDimensions = previousDimensions;
+        }
+
+        @Override
+        public void accept(SearchResponse response) {
+            // TODO shard error handling
+            logger.debug("Received batch of values {} with {} docs", search, response.getHits().getHits().length);
+            SearchHit[] hits = response.getHits().getHits();
+            for (SearchHit hit : hits) {
+                /*
+                 * Read the dimensions out of the sort. This is useful because
+                 * we already need the sort so we can do proper pagination but
+                 * it also converts numeric dimension into a Long which is nice
+                 * and consistent.
+                 */
+                Map<String, Object> dimensions = new HashMap<>();
+                for (int d = 0; d < dimensionFieldNames.size(); d++) {
+                    Object dimensionValue = hit.getSortValues()[d];
+                    if (dimensionValue != null) {
+                        dimensions.put(dimensionFieldNames.get(d), dimensionValue);
+                    }
+                }
+                if (false == Objects.equals(previousDimensions, dimensions)) {
+                    previousDimensions = dimensions;
+                    callback.onTimeSeriesStart(dimensions);
+                }
+                DocumentField metricField = hit.field(metric);
+                if (metricField == null) {
+                    // TODO skip in query?
+                    continue;
+                }
+                long time = Long.parseLong((String) hit.getSortValues()[dimensionFieldNames.size()]);
+                double value = ((Number) metricField.getValue()).doubleValue();
+                callback.onMetric(time, value);
+            }
+
+            if (hits.length < docBatchSize) {
+                callback.onSuccess();
+            } else {
+                valuesInRange(metric, from, to, hits[hits.length - 1].getSortValues(), previousDimensions, callback);
+            }
+        }
+    }
+}

+ 69 - 0
server/src/main/java/org/elasticsearch/timeseries/support/TimeSeriesMetricsService.java

@@ -0,0 +1,69 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.timeseries.support;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.fieldcaps.FieldCapabilities;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
+import org.elasticsearch.client.Client;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class TimeSeriesMetricsService {
+    private final Client client;
+    private final int bucketBatchSize;
+    private final int docBatchSize;
+
+    public TimeSeriesMetricsService(Client client, int bucketBatchSize, int docBatchSize) { // TODO read maxBuckets at runtime
+        this.client = client;
+        this.bucketBatchSize = bucketBatchSize;
+        this.docBatchSize = docBatchSize;
+    }
+
+    public void newMetrics(String[] indices, ActionListener<TimeSeriesMetrics> listener) {
+        FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
+        request.indices(indices);
+        request.fields("*"); // TODO * can be a lot!
+        client.fieldCaps(request, listener.map(this::newMetrics));
+    }
+
+    private TimeSeriesMetrics newMetrics(FieldCapabilitiesResponse response) {
+        if (response.getFailures().isEmpty() == false) {
+            ElasticsearchException e = new ElasticsearchException(
+                "Failed to fetch field caps for " + Arrays.toString(response.getFailedIndices())
+            );
+            for (FieldCapabilitiesFailure f : response.getFailures()) {
+                e.addSuppressed(
+                    new ElasticsearchException("Failed to fetch field caps for " + Arrays.toString(f.getIndices()), f.getException())
+                );
+            }
+            throw e;
+        }
+        List<String> dimensionFieldNames = new ArrayList<>();
+        for (Map.Entry<String, Map<String, FieldCapabilities>> e : response.get().entrySet()) {
+            for (Map.Entry<String, FieldCapabilities> e2 : e.getValue().entrySet()) {
+                collectField(dimensionFieldNames, e.getKey(), e2.getKey(), e2.getValue());
+            }
+        }
+        return new TimeSeriesMetrics(bucketBatchSize, docBatchSize, client, response.getIndices(), List.copyOf(dimensionFieldNames));
+    }
+
+    private void collectField(List<String> dimensions, String fieldName, String fieldType, FieldCapabilities capabilities) {
+        // TODO collect metrics for selector
+        if (capabilities.isDimension()) {
+            dimensions.add(fieldName);
+        }
+    }
+}