Răsfoiți Sursa

Add a TSDB rate aggregation (#90447)

This commit adds an extension to the existing rate aggregation that, when
applied to a field defined as a metric counter, will calculate rates that take
into account counter resets.
Alan Woodward 2 ani în urmă
părinte
comite
89e2f69c15

+ 5 - 0
docs/changelog/90447.yaml

@@ -0,0 +1,5 @@
+pr: 90447
+summary: Add a TSDB rate aggregation
+area: "TSDB"
+type: feature
+issues: []

+ 0 - 3
test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

@@ -449,9 +449,6 @@ public abstract class AggregatorTestCase extends ESTestCase {
      * Collects all documents that match the provided query {@link Query} and
      * returns the reduced {@link InternalAggregation}.
      * <p>
-     * Half the time it aggregates each leaf individually and reduces all
-     * results together. The other half the time it aggregates across the entire
-     * index at once and runs a final reduction on the single resulting agg.
      * It runs the aggregation as well using a circuit breaker that randomly throws {@link CircuitBreakingException}
      * in order to mak sure the implementation does not leak.
      */

+ 10 - 1
test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java

@@ -401,6 +401,13 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
 
     public record BuilderAndToReduce<T> (AggregationBuilder builder, List<T> toReduce) {}
 
+    /**
+     * Does this aggregation support reductions when the internal buckets are not in-order
+     */
+    protected boolean supportsOutOfOrderReduce() {
+        return true;
+    }
+
     public void testReduceRandom() throws IOException {
         String name = randomAlphaOfLength(5);
         int size = between(1, 200);
@@ -412,7 +419,9 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
         MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
         if (randomBoolean() && toReduce.size() > 1) {
             // sometimes do a partial reduce
-            Collections.shuffle(toReduce, random());
+            if (supportsOutOfOrderReduce()) {
+                Collections.shuffle(toReduce, random());
+            }
             int r = randomIntBetween(1, toReduce.size());
             List<InternalAggregation> toPartialReduce = toReduce.subList(0, r);
             // Sort aggs so that unmapped come last. This mimicks the behavior of InternalAggregations.reduce()

+ 16 - 6
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java

@@ -16,6 +16,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.mapper.Mapper;
 import org.elasticsearch.plugins.ActionPlugin;
 import org.elasticsearch.plugins.MapperPlugin;
@@ -42,6 +43,7 @@ import org.elasticsearch.xpack.analytics.multiterms.InternalMultiTerms;
 import org.elasticsearch.xpack.analytics.multiterms.MultiTermsAggregationBuilder;
 import org.elasticsearch.xpack.analytics.normalize.NormalizePipelineAggregationBuilder;
 import org.elasticsearch.xpack.analytics.rate.InternalRate;
+import org.elasticsearch.xpack.analytics.rate.InternalResetTrackingRate;
 import org.elasticsearch.xpack.analytics.rate.RateAggregationBuilder;
 import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats;
 import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
@@ -119,19 +121,27 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
                 TTestAggregationBuilder::new,
                 usage.track(AnalyticsStatsAction.Item.T_TEST, TTestAggregationBuilder.PARSER)
             ).addResultReader(InternalTTest::new).setAggregatorRegistrar(TTestAggregationBuilder::registerUsage),
-            new AggregationSpec(
-                RateAggregationBuilder.NAME,
-                RateAggregationBuilder::new,
-                usage.track(AnalyticsStatsAction.Item.RATE, RateAggregationBuilder.PARSER)
-            ).addResultReader(InternalRate::new).setAggregatorRegistrar(RateAggregationBuilder::registerAggregators),
             new AggregationSpec(
                 MultiTermsAggregationBuilder.NAME,
                 MultiTermsAggregationBuilder::new,
                 usage.track(AnalyticsStatsAction.Item.MULTI_TERMS, MultiTermsAggregationBuilder.PARSER)
-            ).addResultReader(InternalMultiTerms::new).setAggregatorRegistrar(MultiTermsAggregationBuilder::registerAggregators)
+            ).addResultReader(InternalMultiTerms::new).setAggregatorRegistrar(MultiTermsAggregationBuilder::registerAggregators),
+            rateAggregation()
         );
     }
 
+    private AggregationSpec rateAggregation() {
+        AggregationSpec rate = new AggregationSpec(
+            RateAggregationBuilder.NAME,
+            RateAggregationBuilder::new,
+            usage.track(AnalyticsStatsAction.Item.RATE, RateAggregationBuilder.PARSER)
+        ).addResultReader(InternalRate::new).setAggregatorRegistrar(RateAggregationBuilder::registerAggregators);
+        if (IndexSettings.isTimeSeriesModeEnabled()) {
+            rate.addResultReader(InternalResetTrackingRate.NAME, InternalResetTrackingRate::new);
+        }
+        return rate;
+    }
+
     @Override
     public List<ActionPlugin.ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
         return List.of(

+ 123 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/InternalResetTrackingRate.java

@@ -0,0 +1,123 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.AggregationReduceContext;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+public class InternalResetTrackingRate extends InternalNumericMetricsAggregation.SingleValue implements Rate {
+
+    public static final String NAME = "rate_with_resets";
+
+    private final double startValue;
+    private final double endValue;
+    private final long startTime;
+    private final long endTime;
+    private final double resetCompensation;
+
+    protected InternalResetTrackingRate(
+        String name,
+        DocValueFormat format,
+        Map<String, Object> metadata,
+        double startValue,
+        double endValue,
+        long startTime,
+        long endTime,
+        double resetCompensation
+    ) {
+        super(name, format, metadata);
+        this.startValue = startValue;
+        this.endValue = endValue;
+        this.startTime = startTime;
+        this.endTime = endTime;
+        this.resetCompensation = resetCompensation;
+    }
+
+    public InternalResetTrackingRate(StreamInput in) throws IOException {
+        super(in, false);
+        this.startValue = in.readDouble();
+        this.endValue = in.readDouble();
+        this.startTime = in.readLong();
+        this.endTime = in.readLong();
+        this.resetCompensation = in.readDouble();
+    }
+
+    @Override
+    public String getWriteableName() {
+        return NAME;
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        out.writeDouble(startValue);
+        out.writeDouble(endValue);
+        out.writeLong(startTime);
+        out.writeLong(endTime);
+        out.writeDouble(resetCompensation);
+    }
+
+    @Override
+    public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
+        List<InternalResetTrackingRate> toReduce = aggregations.stream()
+            .map(r -> (InternalResetTrackingRate) r)
+            .sorted(Comparator.comparingLong(o -> o.startTime))
+            .toList();
+        double resetComp = toReduce.get(0).resetCompensation;
+        double startValue = toReduce.get(0).startValue;
+        double endValue = toReduce.get(0).endValue;
+        final int endIndex = toReduce.size() - 1;
+        for (int i = 1; i < endIndex + 1; i++) {
+            InternalResetTrackingRate rate = toReduce.get(i);
+            assert rate.startTime >= toReduce.get(i - 1).endTime;
+            resetComp += rate.resetCompensation;
+            if (endValue > rate.startValue) {
+                resetComp += endValue;
+            }
+            endValue = rate.endValue;
+        }
+        return new InternalResetTrackingRate(
+            name,
+            format,
+            metadata,
+            startValue,
+            endValue,
+            toReduce.get(0).startTime,
+            toReduce.get(endIndex).endTime,
+            resetComp
+        );
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        return builder.field(CommonFields.VALUE.getPreferredName(), value());
+    }
+
+    @Override
+    public double value() {
+        return (endValue - startValue + resetCompensation) / (endTime - startTime);
+    }
+
+    @Override
+    public double getValue() {
+        return value();
+    }
+
+    boolean includes(InternalResetTrackingRate other) {
+        return this.startTime < other.startTime && this.endTime > other.endTime;
+    }
+}

+ 7 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorFactory.java

@@ -16,6 +16,7 @@ import org.elasticsearch.search.aggregations.CardinalityUpperBound;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
 import org.elasticsearch.search.aggregations.support.AggregationContext;
 import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
+import org.elasticsearch.search.aggregations.support.TimeSeriesValuesSourceType;
 import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
 import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
@@ -64,6 +65,12 @@ class RateAggregatorFactory extends ValuesSourceAggregatorFactory {
             HistogramRateAggregator::new,
             true
         );
+        builder.register(
+            RateAggregationBuilder.REGISTRY_KEY,
+            Collections.singletonList(TimeSeriesValuesSourceType.COUNTER),
+            TimeSeriesRateAggregator::new,
+            true
+        );
     }
 
     @Override

+ 154 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/TimeSeriesRateAggregator.java

@@ -0,0 +1,154 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.elasticsearch.common.Rounding;
+import org.elasticsearch.common.util.DoubleArray;
+import org.elasticsearch.common.util.LongArray;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.index.fielddata.NumericDoubleValues;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.MultiValueMode;
+import org.elasticsearch.search.aggregations.AggregationExecutionContext;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
+import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TimeSeriesRateAggregator extends NumericMetricsAggregator.SingleValue {
+
+    protected final ValuesSource.Numeric valuesSource;
+
+    protected DoubleArray startValues;
+    protected DoubleArray endValues;
+    protected LongArray startTimes;
+    protected LongArray endTimes;
+    protected DoubleArray resetCompensations;
+
+    private long currentBucket = -1;
+    private long currentEndTime = -1;
+    private long currentStartTime = -1;
+    private double resetCompensation = 0;
+    private double currentEndValue = -1;
+    private double currentStartValue = -1;
+    private int currentTsid = -1;
+
+    // Unused parameters are so that the constructor implements `RateAggregatorSupplier`
+    protected TimeSeriesRateAggregator(
+        String name,
+        ValuesSourceConfig valuesSourceConfig,
+        Rounding.DateTimeUnit rateUnit,
+        RateMode rateMode,
+        AggregationContext context,
+        Aggregator parent,
+        Map<String, Object> metadata
+    ) throws IOException {
+        super(name, context, parent, metadata);
+        this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
+        this.startValues = bigArrays().newDoubleArray(1, true);
+        this.endValues = bigArrays().newDoubleArray(1, true);
+        this.startTimes = bigArrays().newLongArray(1, true);
+        this.endTimes = bigArrays().newLongArray(1, true);
+        this.resetCompensations = bigArrays().newDoubleArray(1, true);
+    }
+
+    @Override
+    public InternalAggregation buildEmptyAggregation() {
+        return new InternalResetTrackingRate(name, DocValueFormat.RAW, metadata(), 0, 0, 0, 0, 0);
+    }
+
+    private void calculateLastBucket() {
+        if (currentBucket != -1) {
+            startValues.set(currentBucket, currentStartValue);
+            endValues.set(currentBucket, currentEndValue);
+            startTimes.set(currentBucket, currentStartTime);
+            endTimes.set(currentBucket, currentEndTime);
+            resetCompensations.set(currentBucket, resetCompensation);
+            currentBucket = -1;
+        }
+    }
+
+    private double checkForResets(double latestValue) {
+        if (latestValue > currentStartValue) {
+            // reset detected
+            resetCompensation += currentEndValue;
+            currentEndValue = latestValue;
+        }
+        return latestValue;
+    }
+
+    @Override
+    protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
+        NumericDoubleValues leafValues = MultiValueMode.MAX.select(valuesSource.doubleValues(aggCtx.getLeafReaderContext()));
+        return new LeafBucketCollectorBase(sub, null) {
+            @Override
+            public void collect(int doc, long bucket) throws IOException {
+                leafValues.advanceExact(doc);   // TODO handle missing values
+                double latestValue = leafValues.doubleValue();
+
+                if (bucket != currentBucket) {
+                    startValues = bigArrays().grow(startValues, bucket + 1);
+                    endValues = bigArrays().grow(endValues, bucket + 1);
+                    startTimes = bigArrays().grow(startTimes, bucket + 1);
+                    endTimes = bigArrays().grow(endTimes, bucket + 1);
+                    resetCompensations = bigArrays().grow(resetCompensations, bucket + 1);
+                    if (currentTsid != aggCtx.getTsidOrd()) {
+                        // if we're on a new tsid then we need to calculate the last bucket
+                        calculateLastBucket();
+                        currentTsid = aggCtx.getTsidOrd();
+                    } else {
+                        // if we're in a new bucket but in the same tsid then we update the
+                        // timestamp and last value before we calculate the last bucket
+                        currentStartTime = aggCtx.getTimestamp();
+                        currentStartValue = checkForResets(latestValue);
+                        calculateLastBucket();
+                    }
+                    currentBucket = bucket;
+                    currentStartTime = currentEndTime = aggCtx.getTimestamp();
+                    currentStartValue = currentEndValue = latestValue;
+                    resetCompensation = 0;
+                } else {
+                    currentStartTime = aggCtx.getTimestamp();
+                    currentStartValue = checkForResets(latestValue);
+                }
+            }
+        };
+    }
+
+    @Override
+    public InternalResetTrackingRate buildAggregation(long owningBucketOrd) {
+        calculateLastBucket();
+        return new InternalResetTrackingRate(
+            name,
+            DocValueFormat.RAW,
+            metadata(),
+            startValues.get(owningBucketOrd),
+            endValues.get(owningBucketOrd),
+            startTimes.get(owningBucketOrd),
+            endTimes.get(owningBucketOrd),
+            resetCompensations.get(owningBucketOrd)
+        );
+    }
+
+    @Override
+    protected void doClose() {
+        Releasables.close(startValues, endValues, startTimes, endTimes, resetCompensations);
+    }
+
+    @Override
+    public double metric(long owningBucketOrd) {
+        return buildAggregation(owningBucketOrd).getValue();
+    }
+}

+ 133 - 0
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/InternalResetTrackingRateTests.java

@@ -0,0 +1,133 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.plugins.SearchPlugin;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.ParsedAggregation;
+import org.elasticsearch.test.InternalAggregationTestCase;
+import org.elasticsearch.xcontent.NamedXContentRegistry;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.mockito.Mockito.mock;
+
+public class InternalResetTrackingRateTests extends InternalAggregationTestCase<InternalResetTrackingRate> {
+
+    @Override
+    protected SearchPlugin registerPlugin() {
+        return new AnalyticsPlugin();
+    }
+
+    @Override
+    protected InternalResetTrackingRate createTestInstance(String name, Map<String, Object> metadata) {
+        return new InternalResetTrackingRate(name, null, metadata, 0, 0, 0, 0, 0);
+    }
+
+    private static InternalResetTrackingRate rate(double startValue, double endValue, long startTime, long endTime, double resetComp) {
+        return new InternalResetTrackingRate("n", null, null, startValue, endValue, startTime, endTime, resetComp);
+    }
+
+    public void testReduction() {
+        List<InternalAggregation> rates = List.of(
+            rate(0, 10, 1000, 2000, 0),
+            rate(10, 20, 2000, 3000, 0),
+            rate(20, 5, 3000, 4000, 25), // internal reset
+            rate(5, 15, 4000, 5000, 0),
+            rate(0, 10, 5000, 6000, 0)  // cross-boundary reset
+        );
+        InternalAggregation reduced = rates.get(0).reduce(rates, null);
+        assertThat(reduced, instanceOf(Rate.class));
+        assertThat(((Rate) reduced).getValue(), equalTo(0.01));
+    }
+
+    @Override
+    protected void assertReduced(InternalResetTrackingRate reduced, List<InternalResetTrackingRate> inputs) {
+        for (InternalResetTrackingRate input : inputs) {
+            assertEquals(0.01f, input.getValue(), 0.001);
+        }
+        assertEquals(0.01f, reduced.getValue(), 0.001);
+    }
+
+    // Buckets must always be in-order so that we can detect resets between consecutive buckets
+    @Override
+    protected boolean supportsOutOfOrderReduce() {
+        return false;
+    }
+
+    @Override
+    protected BuilderAndToReduce<InternalResetTrackingRate> randomResultsToReduce(String name, int size) {
+        // generate a monotonically increasing counter, starting at 0 finishing at 1000 and increasing
+        // by 10 each time
+        // randomly reset to 0
+        // randomly break to a new rate
+        List<InternalResetTrackingRate> internalRates = new ArrayList<>();
+        double startValue = 0, currentValue = 0;
+        double resetComp = 0;
+        long startTime = 0;
+        long endTime = 0;
+        while (internalRates.size() < size - 1) {
+            endTime += 1000;
+            currentValue += 10;
+            if (randomInt(30) == 0) {
+                resetComp += currentValue;
+                currentValue = 0;
+            }
+            if (randomInt(45) == 0) {
+                internalRates.add(rate(startValue, currentValue, startTime, endTime, resetComp));
+                startValue = currentValue;
+                resetComp = 0;
+                startTime = endTime;
+            }
+        }
+        if (startTime == endTime) {
+            endTime += 1000;
+            currentValue += 10;
+        }
+        internalRates.add(rate(startValue, currentValue, startTime, endTime, resetComp));
+        return new BuilderAndToReduce<>(mock(RateAggregationBuilder.class), internalRates);
+    }
+
+    @Override
+    protected void assertFromXContent(InternalResetTrackingRate aggregation, ParsedAggregation parsedAggregation) throws IOException {
+
+    }
+
+    @Override
+    protected List<NamedXContentRegistry.Entry> getNamedXContents() {
+        return CollectionUtils.appendToCopy(
+            super.getNamedXContents(),
+            new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(InternalResetTrackingRate.NAME), (p, c) -> {
+                assumeTrue("There is no ParsedRate yet", false);
+                return null;
+            })
+        );
+    }
+
+    public void testIncludes() {
+        InternalResetTrackingRate big = new InternalResetTrackingRate("n", null, null, 0, 0, 1000, 3000, 0);
+        InternalResetTrackingRate small = new InternalResetTrackingRate("n", null, null, 0, 0, 1500, 2500, 0);
+        assertTrue(big.includes(small));
+        assertFalse(small.includes(big));
+
+        InternalResetTrackingRate unrelated = new InternalResetTrackingRate("n", null, null, 0, 0, 100000, 1000010, 0);
+        assertFalse(big.includes(unrelated));
+        assertFalse(unrelated.includes(big));
+        assertFalse(small.includes(unrelated));
+        assertFalse(unrelated.includes(small));
+    }
+}

+ 157 - 0
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/TimeSeriesRateAggregatorTests.java

@@ -0,0 +1,157 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.elasticsearch.aggregations.AggregationsPlugin;
+import org.elasticsearch.aggregations.bucket.timeseries.InternalTimeSeries;
+import org.elasticsearch.aggregations.bucket.timeseries.TimeSeriesAggregationBuilder;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
+import org.elasticsearch.index.mapper.TimeSeriesParams;
+import org.elasticsearch.plugins.SearchPlugin;
+import org.elasticsearch.search.aggregations.AggregatorTestCase;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
+import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+
+public class TimeSeriesRateAggregatorTests extends AggregatorTestCase {
+
+    @Override
+    protected List<SearchPlugin> getSearchPlugins() {
+        return List.of(new AggregationsPlugin(), new AnalyticsPlugin());
+    }
+
+    public void testSimple() throws IOException {
+        RateAggregationBuilder builder = new RateAggregationBuilder("counter_field").field("counter_field");
+        TimeSeriesAggregationBuilder tsBuilder = new TimeSeriesAggregationBuilder("tsid");
+        tsBuilder.subAggregation(builder);
+        Consumer<InternalTimeSeries> verifier = r -> {
+            assertThat(r.getBuckets(), hasSize(2));
+            assertThat(((Rate) r.getBucketByKey("{dim=1}").getAggregations().asList().get(0)).getValue(), closeTo(59.0 / 3000.0, 0.00001));
+            assertThat(((Rate) r.getBucketByKey("{dim=2}").getAggregations().asList().get(0)).getValue(), closeTo(206.0 / 4000.0, 0.00001));
+        };
+        AggTestConfig aggTestConfig = new AggTestConfig(tsBuilder, timeStampField(), counterField("counter_field"))
+            .withSplitLeavesIntoSeperateAggregators(false);
+        testCase(iw -> {
+            iw.addDocuments(docs(1000, "1", 15, 37, 60, /*reset*/ 14));
+            iw.addDocuments(docs(1000, "2", 74, 150, /*reset*/ 50, 90, /*reset*/ 40));
+        }, verifier, aggTestConfig);
+    }
+
+    public void testNestedWithinDateHistogram() throws IOException {
+        RateAggregationBuilder builder = new RateAggregationBuilder("counter_field").field("counter_field");
+        DateHistogramAggregationBuilder dateBuilder = new DateHistogramAggregationBuilder("date");
+        dateBuilder.field("@timestamp");
+        dateBuilder.fixedInterval(DateHistogramInterval.seconds(2));
+        dateBuilder.subAggregation(builder);
+        TimeSeriesAggregationBuilder tsBuilder = new TimeSeriesAggregationBuilder("tsid");
+        tsBuilder.subAggregation(dateBuilder);
+
+        Consumer<InternalTimeSeries> verifier = r -> {
+            assertThat(r.getBuckets(), hasSize(2));
+            assertThat(r.getBucketByKey("{dim=1}"), instanceOf(InternalTimeSeries.InternalBucket.class));
+            InternalDateHistogram hb = r.getBucketByKey("{dim=1}").getAggregations().get("date");
+            {
+                Rate rate = hb.getBuckets().get(1).getAggregations().get("counter_field");
+                assertThat(rate.getValue(), closeTo((60 - 37 + 14) / 2000.0, 0.00001));
+            }
+            {
+                Rate rate = hb.getBuckets().get(0).getAggregations().get("counter_field");
+                assertThat(rate.getValue(), closeTo((37 - 15) / 1000.0, 0.00001));
+            }
+            hb = r.getBucketByKey("{dim=2}").getAggregations().get("date");
+            {
+                Rate rate = hb.getBuckets().get(0).getAggregations().get("counter_field");
+                assertThat(rate.getValue(), closeTo((150 - 74) / 1000.0, 0.00001));
+            }
+            {
+                Rate rate = hb.getBuckets().get(1).getAggregations().get("counter_field");
+                assertThat(rate.getValue(), closeTo(90 / 2000.0, 0.00001));
+            }
+        };
+
+        AggTestConfig aggTestConfig = new AggTestConfig(tsBuilder, timeStampField(), counterField("counter_field"))
+            .withSplitLeavesIntoSeperateAggregators(false);
+        testCase(iw -> {
+            iw.addDocuments(docs(2000, "1", 15, 37, 60, /*reset*/ 14));
+            iw.addDocuments(docs(2000, "2", 74, 150, /*reset*/ 50, 90, /*reset*/ 40));
+        }, verifier, aggTestConfig);
+    }
+
+    private List<Document> docs(long startTimestamp, String dim, long... values) throws IOException {
+
+        List<Document> documents = new ArrayList<>();
+        for (int i = 0; i < values.length; i++) {
+            documents.add(doc(startTimestamp + (i * 1000L), tsid(dim), values[i]));
+        }
+        return documents;
+    }
+
+    private static BytesReference tsid(String dim) throws IOException {
+        TimeSeriesIdFieldMapper.TimeSeriesIdBuilder idBuilder = new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(null);
+        idBuilder.addString("dim", dim);
+        return idBuilder.build();
+    }
+
+    private Document doc(long timestamp, BytesReference tsid, long counterValue) {
+        Document doc = new Document();
+        doc.add(new SortedNumericDocValuesField("@timestamp", timestamp));
+        doc.add(new SortedDocValuesField("_tsid", tsid.toBytesRef()));
+        doc.add(new NumericDocValuesField("counter_field", counterValue));
+        return doc;
+    }
+
+    private MappedFieldType counterField(String name) {
+        return new NumberFieldMapper.NumberFieldType(
+            name,
+            NumberFieldMapper.NumberType.LONG,
+            true,
+            false,
+            true,
+            false,
+            null,
+            Collections.emptyMap(),
+            null,
+            false,
+            TimeSeriesParams.MetricType.counter
+        );
+    }
+
+    private DateFieldMapper.DateFieldType timeStampField() {
+        return new DateFieldMapper.DateFieldType(
+            "@timestamp",
+            true,
+            false,
+            true,
+            DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER,
+            DateFieldMapper.Resolution.MILLISECONDS,
+            null,
+            null,
+            Collections.emptyMap()
+        );
+    }
+
+}

+ 135 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/reset_tracking_rate.yml

@@ -0,0 +1,135 @@
+setup:
+  - skip:
+      version: " - 8.5.0"
+      reason: Reset tracking rate agg added in 8.6
+
+  - do:
+      indices.create:
+        index: tsdb-01
+        body:
+          settings:
+            number_of_replicas: 0
+            mode: time_series
+            routing_path: [key]
+            time_series:
+              start_time: "2021-01-01T00:00:00Z"
+              end_time: "2021-01-01T01:00:00Z"
+          mappings:
+            properties:
+              val:
+                type: long
+                time_series_metric: counter
+              key:
+                type: keyword
+                time_series_dimension: true
+              "@timestamp":
+                type: date
+  - do:
+      indices.create:
+        index: tsdb-02
+        body:
+          settings:
+            number_of_replicas: 0
+            mode: time_series
+            routing_path: [key]
+            time_series:
+              start_time: "2021-01-01T00:01:00Z"
+              end_time: "2021-01-01T02:00:00Z"
+          mappings:
+            properties:
+              val:
+                type: long
+                time_series_metric: counter
+              key:
+                type: keyword
+                time_series_dimension: true
+              "@timestamp":
+                type: date
+  - do:
+      indices.create:
+        index: tsdb-03
+        body:
+          settings:
+            number_of_replicas: 0
+            mode: time_series
+            routing_path: [key]
+            time_series:
+              start_time: "2021-01-01T00:02:00Z"
+              end_time: "2021-01-01T03:00:00Z"
+          mappings:
+            properties:
+              val:
+                type: long
+                time_series_metric: counter
+              key:
+                type: keyword
+                time_series_dimension: true
+              "@timestamp":
+                type: date
+  - do:
+      cluster.health:
+        wait_for_status: green
+
+  - do:
+      bulk:
+        index: tsdb-01
+        refresh: true
+        body:
+          - '{ "index": {} }'
+          - '{ "key": "bar", "val": 40, "@timestamp": "2021-01-01T00:00:00Z" }'
+          - '{ "index": {}}'
+          - '{ "key": "bar", "val": 50, "@timestamp": "2021-01-01T00:20:00Z" }'
+          - '{ "index": {}}'
+          - '{ "key": "bar", "val": 60, "@timestamp": "2021-01-01T00:40:00Z" }'
+  - do:
+      bulk:
+        index: tsdb-02
+        refresh: true
+        body:
+          - '{ "index": {} }'
+          - '{ "key": "bar", "val": 70, "@timestamp": "2021-01-01T01:00:00Z" }'
+          - '{ "index": {}}'
+          - '{ "key": "bar", "val": 0, "@timestamp": "2021-01-01T01:20:00Z" }'
+          - '{ "index": {}}'
+          - '{ "key": "bar", "val": 10, "@timestamp": "2021-01-01T01:40:00Z" }'
+
+  - do:
+      bulk:
+        index: tsdb-03
+        refresh: true
+        body:
+          - '{ "index": {} }'
+          - '{ "key": "bar", "val": 0, "@timestamp": "2021-01-01T02:00:00Z" }'
+          - '{ "index": {}}'
+          - '{ "key": "bar", "val": 10, "@timestamp": "2021-01-01T02:20:00Z" }'
+          - '{ "index": {}}'
+          - '{ "key": "bar", "val": 20, "@timestamp": "2021-01-01T02:40:00Z" }'
+---
+"test resets do not lead to negative rate":
+  - do:
+      search:
+        index: tsdb-*
+        body:
+          query:
+            range:
+              "@timestamp":
+                gte: "2020-01-01T00:10:00Z"
+          size: 0
+          aggs:
+            ts:
+              time_series:
+                keyed: false
+              aggs:
+                rate:
+                  rate:
+                    field: val
+
+
+
+  - match: { hits.total.value: 9 }
+  - length: { aggregations.ts.buckets: 1 }
+
+  - match: { aggregations.ts.buckets.0.key: { "key": "bar" } }
+  - match: { aggregations.ts.buckets.0.doc_count: 9 }
+  - gte: { aggregations.ts.buckets.0.rate.value: 0.0 }
+