Browse Source

Improve rolling up metrics (#124739)

Remove unneeded Metric abstraction in MetricFieldProducer, which speeds-up rolling up gauges/counters.
Martijn van Groningen 7 months ago
parent
commit
d81c7995a7

+ 5 - 0
docs/changelog/124739.yaml

@@ -0,0 +1,5 @@
+pr: 124739
+summary: Improve rolling up metrics
+area: Downsampling
+type: enhancement
+issues: []

+ 18 - 4
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java

@@ -12,7 +12,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import java.io.IOException;
 import java.util.Collection;
 
-public class AggregateMetricFieldSerializer implements DownsampleFieldSerializer {
+final class AggregateMetricFieldSerializer implements DownsampleFieldSerializer {
     private final Collection<AbstractDownsampleFieldProducer> producers;
     private final String name;
 
@@ -22,7 +22,7 @@ public class AggregateMetricFieldSerializer implements DownsampleFieldSerializer
      * @param producers a collection of {@link AbstractDownsampleFieldProducer} instances with the subfields
      *                  of the aggregate_metric_double field.
      */
-    public AggregateMetricFieldSerializer(String name, Collection<AbstractDownsampleFieldProducer> producers) {
+    AggregateMetricFieldSerializer(String name, Collection<AbstractDownsampleFieldProducer> producers) {
         this.name = name;
         this.producers = producers;
     }
@@ -38,8 +38,22 @@ public class AggregateMetricFieldSerializer implements DownsampleFieldSerializer
             assert name.equals(fieldProducer.name()) : "producer has a different name";
             if (fieldProducer.isEmpty() == false) {
                 if (fieldProducer instanceof MetricFieldProducer metricFieldProducer) {
-                    for (MetricFieldProducer.Metric metric : metricFieldProducer.metrics()) {
-                        builder.field(metric.name(), metric.get());
+                    if (metricFieldProducer instanceof MetricFieldProducer.GaugeMetricFieldProducer gaugeProducer) {
+                        builder.field("max", gaugeProducer.max);
+                        builder.field("min", gaugeProducer.min);
+                        builder.field("sum", gaugeProducer.sum.value());
+                        builder.field("value_count", gaugeProducer.count);
+                    } else if (metricFieldProducer instanceof MetricFieldProducer.CounterMetricFieldProducer counterProducer) {
+                        builder.field("last_value", counterProducer.lastValue);
+                    } else if (metricFieldProducer instanceof MetricFieldProducer.AggregatedGaugeMetricFieldProducer producer) {
+                        switch (producer.metric) {
+                            case max -> builder.field("max", producer.max);
+                            case min -> builder.field("min", producer.min);
+                            case sum -> builder.field("sum", producer.sum.value());
+                            case value_count -> builder.field("value_count", producer.count);
+                        }
+                    } else {
+                        throw new IllegalStateException();
                     }
                 } else if (fieldProducer instanceof LabelFieldProducer labelFieldProducer) {
                     LabelFieldProducer.Label label = labelFieldProducer.label();

+ 1 - 8
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldValueFetcher.java

@@ -47,14 +47,7 @@ public final class AggregateMetricFieldValueFetcher extends FieldValueFetcher {
         if (aggMetricFieldType.getMetricType() != null) {
             // If the field is an aggregate_metric_double field, we should use the correct subfields
             // for each aggregation. This is a downsample-of-downsample case
-            MetricFieldProducer.Metric metricOperation = switch (metric) {
-                case max -> new MetricFieldProducer.Max();
-                case min -> new MetricFieldProducer.Min();
-                case sum -> new MetricFieldProducer.Sum();
-                // To compute value_count summary, we must sum all field values
-                case value_count -> new MetricFieldProducer.Sum(AggregateMetricDoubleFieldMapper.Metric.value_count.name());
-            };
-            return new MetricFieldProducer.GaugeMetricFieldProducer(aggMetricFieldType.name(), metricOperation);
+            return new MetricFieldProducer.AggregatedGaugeMetricFieldProducer(aggMetricFieldType.name(), metric);
         } else {
             // If field is not a metric, we downsample it as a label
             return new LabelFieldProducer.AggregateMetricFieldProducer.AggregateMetricFieldProducer(aggMetricFieldType.name(), metric);

+ 109 - 210
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java

@@ -12,6 +12,7 @@ import org.elasticsearch.index.fielddata.FormattedDocValues;
 import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
 import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
 import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateMetricDoubleFieldMapper;
 
 import java.io.IOException;
 
@@ -21,279 +22,177 @@ import java.io.IOException;
  * gauge and metric types.
  */
 abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProducer {
-    /**
-     * a list of metrics that will be computed for the field
-     */
-    private final Metric[] metrics;
 
-    MetricFieldProducer(String name, Metric... metrics) {
+    MetricFieldProducer(String name) {
         super(name);
-        this.metrics = metrics;
-    }
-
-    /**
-     * Reset all values collected for the field
-     */
-    public void reset() {
-        for (Metric metric : metrics) {
-            metric.reset();
-        }
-        isEmpty = true;
-    }
-
-    /** return the list of metrics that are computed for the field */
-    public Metric[] metrics() {
-        return metrics;
-    }
-
-    /** Collect the value of a raw field and compute all downsampled metrics */
-    void collect(double value) {
-        for (MetricFieldProducer.Metric metric : metrics()) {
-            metric.collect(value);
-        }
-        isEmpty = false;
     }
 
     @Override
-    public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException {
+    public void collect(FormattedDocValues docValues, IntArrayList buffer) throws IOException {
         assert false : "MetricFieldProducer does not support formatted doc values";
         throw new UnsupportedOperationException();
     }
 
-    public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException {
-        for (int i = 0; i < docIdBuffer.size(); i++) {
-            int docId = docIdBuffer.get(i);
-            if (docValues.advanceExact(docId) == false) {
-                continue;
-            }
-            int docValuesCount = docValues.docValueCount();
-            for (int j = 0; j < docValuesCount; j++) {
-                double num = docValues.nextValue();
-                collect(num);
-            }
-        }
-    }
-
-    abstract static sealed class Metric {
-        final String name;
-
-        /**
-         * Abstract class that defines how a metric is computed.
-         * @param name the name of the metric as it will be output in the downsampled document
-         */
-        protected Metric(String name) {
-            this.name = name;
-        }
-
-        public String name() {
-            return name;
-        }
-
-        abstract void collect(double number);
-
-        abstract double get();
-
-        abstract void reset();
-    }
+    public abstract void collect(SortedNumericDoubleValues docValues, IntArrayList buffer) throws IOException;
 
     /**
-     * Metric implementation that computes the maximum of all values of a field
+     * {@link MetricFieldProducer} implementation for a counter metric field
      */
-    static final class Max extends Metric {
-        private static final double NO_VALUE = -Double.MAX_VALUE;
+    static final class CounterMetricFieldProducer extends MetricFieldProducer {
 
-        private double max = NO_VALUE;
+        static final double NO_VALUE = Double.MIN_VALUE;
 
-        Max() {
-            super("max");
-        }
-
-        @Override
-        void collect(double value) {
-            this.max = Math.max(value, max);
-        }
+        double lastValue = NO_VALUE;
 
-        @Override
-        double get() {
-            return max;
+        CounterMetricFieldProducer(String name) {
+            super(name);
         }
 
         @Override
-        void reset() {
-            max = NO_VALUE;
-        }
-    }
-
-    /**
-     * Metric implementation that computes the minimum of all values of a field
-     */
-    static final class Min extends Metric {
-        private static final double NO_VALUE = Double.MAX_VALUE;
-
-        private double min = NO_VALUE;
-
-        Min() {
-            super("min");
-        }
+        public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException {
+            if (isEmpty() == false) {
+                return;
+            }
 
-        @Override
-        void collect(double value) {
-            this.min = Math.min(value, min);
+            for (int i = 0; i < docIdBuffer.size(); i++) {
+                int docId = docIdBuffer.get(i);
+                if (docValues.advanceExact(docId)) {
+                    isEmpty = false;
+                    lastValue = docValues.nextValue();
+                    return;
+                }
+            }
         }
 
         @Override
-        double get() {
-            return min;
+        public void reset() {
+            isEmpty = true;
+            lastValue = NO_VALUE;
         }
 
         @Override
-        void reset() {
-            min = NO_VALUE;
+        public void write(XContentBuilder builder) throws IOException {
+            if (isEmpty() == false) {
+                builder.field(name(), lastValue);
+            }
         }
     }
 
-    /**
-     * Metric implementation that computes the sum of all values of a field
-     */
-    static final class Sum extends Metric {
-        private final CompensatedSum kahanSummation = new CompensatedSum();
-
-        Sum() {
-            super("sum");
-        }
-
-        Sum(String name) {
-            super(name);
-        }
-
-        @Override
-        void collect(double value) {
-            kahanSummation.add(value);
-        }
-
-        @Override
-        double get() {
-            return kahanSummation.value();
-        }
-
-        @Override
-        void reset() {
-            kahanSummation.reset(0, 0);
-        }
-    }
+    static final double MAX_NO_VALUE = -Double.MAX_VALUE;
+    static final double MIN_NO_VALUE = Double.MAX_VALUE;
 
     /**
-     * Metric implementation that counts all values collected for a metric field
+     * {@link MetricFieldProducer} implementation for a gauge metric field
      */
-    static final class ValueCount extends Metric {
-        private long count;
+    static final class GaugeMetricFieldProducer extends MetricFieldProducer {
 
-        ValueCount() {
-            super("value_count");
-        }
+        double max = MAX_NO_VALUE;
+        double min = MIN_NO_VALUE;
+        final CompensatedSum sum = new CompensatedSum();
+        long count;
 
-        @Override
-        void collect(double value) {
-            count++;
+        GaugeMetricFieldProducer(String name) {
+            super(name);
         }
 
         @Override
-        double get() {
-            return count;
+        public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException {
+            for (int i = 0; i < docIdBuffer.size(); i++) {
+                int docId = docIdBuffer.get(i);
+                if (docValues.advanceExact(docId) == false) {
+                    continue;
+                }
+                isEmpty = false;
+                int docValuesCount = docValues.docValueCount();
+                for (int j = 0; j < docValuesCount; j++) {
+                    double value = docValues.nextValue();
+                    this.max = Math.max(value, max);
+                    this.min = Math.min(value, min);
+                    sum.add(value);
+                    count++;
+                }
+            }
         }
 
         @Override
-        void reset() {
+        public void reset() {
+            isEmpty = true;
+            max = MAX_NO_VALUE;
+            min = MIN_NO_VALUE;
+            sum.reset(0, 0);
             count = 0;
         }
-    }
-
-    /**
-     * Metric implementation that stores the last value over time for a metric. This implementation
-     * assumes that field values are collected sorted by descending order by time. In this case,
-     * it assumes that the last value of the time is the first value collected. Eventually,
-     * the implementation of this class end up storing the first value it is empty and then
-     * ignoring everything else.
-     */
-    static final class LastValue extends Metric {
-        private static final double NO_VALUE = Double.MIN_VALUE;
-
-        private double lastValue = NO_VALUE;
-
-        LastValue() {
-            super("last_value");
-        }
 
         @Override
-        void collect(double value) {
-            if (lastValue == Double.MIN_VALUE) {
-                lastValue = value;
+        public void write(XContentBuilder builder) throws IOException {
+            if (isEmpty() == false) {
+                builder.startObject(name());
+                builder.field("min", min);
+                builder.field("max", max);
+                builder.field("sum", sum.value());
+                builder.field("value_count", count);
+                builder.endObject();
             }
         }
+    }
 
-        @Override
-        double get() {
-            return lastValue;
-        }
+    // For downsampling downsampled indices:
+    static final class AggregatedGaugeMetricFieldProducer extends MetricFieldProducer {
 
-        @Override
-        void reset() {
-            lastValue = NO_VALUE;
-        }
-    }
+        final AggregateMetricDoubleFieldMapper.Metric metric;
 
-    /**
-     * {@link MetricFieldProducer} implementation for a counter metric field
-     */
-    static final class CounterMetricFieldProducer extends MetricFieldProducer {
+        double max = MAX_NO_VALUE;
+        double min = MIN_NO_VALUE;
+        final CompensatedSum sum = new CompensatedSum();
+        long count;
 
-        CounterMetricFieldProducer(String name) {
-            super(name, new LastValue());
+        AggregatedGaugeMetricFieldProducer(String name, AggregateMetricDoubleFieldMapper.Metric metric) {
+            super(name);
+            this.metric = metric;
         }
 
         @Override
         public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException {
-            // Counter producers only collect the last_value. Since documents are
-            // collected by descending timestamp order, the producer should only
-            // process the first value for every tsid. So, it will only collect the
-            // field if no value has been set before.
-            if (isEmpty()) {
-                super.collect(docValues, docIdBuffer);
+            for (int i = 0; i < docIdBuffer.size(); i++) {
+                int docId = docIdBuffer.get(i);
+                if (docValues.advanceExact(docId) == false) {
+                    continue;
+                }
+                isEmpty = false;
+                int docValuesCount = docValues.docValueCount();
+                for (int j = 0; j < docValuesCount; j++) {
+                    double value = docValues.nextValue();
+                    switch (metric) {
+                        case min -> min = Math.min(value, min);
+                        case max -> max = Math.max(value, max);
+                        case sum -> sum.add(value);
+                        // This is the reason why we can't use GaugeMetricFieldProducer
+                        // For downsampled indices aggregate metric double's value count field needs to be summed.
+                        // (Note: not using CompensatedSum here should be ok given that value_count is mapped as long)
+                        case value_count -> count += Math.round(value);
+                    }
+                }
             }
         }
 
-        public Object value() {
-            assert metrics().length == 1 : "Single value producers must have only one metric";
-            return metrics()[0].get();
-        }
-
         @Override
-        public void write(XContentBuilder builder) throws IOException {
-            if (isEmpty() == false) {
-                builder.field(name(), value());
-            }
-        }
-    }
-
-    /**
-     * {@link MetricFieldProducer} implementation for a gauge metric field
-     */
-    static final class GaugeMetricFieldProducer extends MetricFieldProducer {
-
-        GaugeMetricFieldProducer(String name) {
-            this(name, new Min(), new Max(), new Sum(), new ValueCount());
-        }
-
-        GaugeMetricFieldProducer(String name, Metric... metrics) {
-            super(name, metrics);
+        public void reset() {
+            isEmpty = true;
+            max = MAX_NO_VALUE;
+            min = MIN_NO_VALUE;
+            sum.reset(0, 0);
+            count = 0;
         }
 
         @Override
         public void write(XContentBuilder builder) throws IOException {
             if (isEmpty() == false) {
                 builder.startObject(name());
-                for (MetricFieldProducer.Metric metric : metrics()) {
-                    builder.field(metric.name(), metric.get());
+                switch (metric) {
+                    case min -> builder.field("min", min);
+                    case max -> builder.field("max", max);
+                    case sum -> builder.field("sum", sum.value());
+                    case value_count -> builder.field("value_count", count);
                 }
                 builder.endObject();
             }

+ 114 - 71
x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java

@@ -7,7 +7,10 @@
 
 package org.elasticsearch.xpack.downsample;
 
+import org.apache.lucene.internal.hppc.IntArrayList;
+import org.apache.lucene.internal.hppc.IntDoubleHashMap;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
@@ -16,114 +19,129 @@ import java.io.IOException;
 
 public class MetricFieldProducerTests extends AggregatorTestCase {
 
-    public void testMinCountMetric() {
-        MetricFieldProducer.Metric metric = new MetricFieldProducer.Min();
-        assertEquals(Double.MAX_VALUE, metric.get(), 0);
-        metric.collect(40);
-        metric.collect(5.5);
-        metric.collect(12.2);
-        metric.collect(55);
-        assertEquals(5.5, metric.get(), 0);
-        metric.reset();
-        assertEquals(Double.MAX_VALUE, metric.get(), 0);
+    public void testMinCountMetric() throws IOException {
+        var instance = new MetricFieldProducer.GaugeMetricFieldProducer(randomAlphaOfLength(10));
+        assertEquals(Double.MAX_VALUE, instance.min, 0);
+        var docIdBuffer = IntArrayList.from(0, 1, 2, 3);
+        var values = createValuesInstance(docIdBuffer, 40, 5.5, 12.2, 55);
+        instance.collect(values, docIdBuffer);
+        assertEquals(5.5, instance.min, 0);
+        instance.reset();
+        assertEquals(Double.MAX_VALUE, instance.min, 0);
     }
 
-    public void testMaxCountMetric() {
-        MetricFieldProducer.Metric metric = new MetricFieldProducer.Max();
-        assertEquals(-Double.MAX_VALUE, metric.get(), 0);
-        metric.collect(5.5);
-        metric.collect(12.2);
-        metric.collect(55);
-        assertEquals(55d, metric.get(), 0);
-        metric.reset();
-        assertEquals(-Double.MAX_VALUE, metric.get(), 0);
+    public void testMaxCountMetric() throws IOException {
+        var instance = new MetricFieldProducer.GaugeMetricFieldProducer(randomAlphaOfLength(10));
+        assertEquals(-Double.MAX_VALUE, instance.max, 0);
+        var docIdBuffer = IntArrayList.from(0, 1, 2);
+        var values = createValuesInstance(docIdBuffer, 5.5, 12.2, 55);
+        instance.collect(values, docIdBuffer);
+        assertEquals(55d, instance.max, 0);
+        instance.reset();
+        assertEquals(-Double.MAX_VALUE, instance.max, 0);
     }
 
-    public void testSumCountMetric() {
-        MetricFieldProducer.Metric metric = new MetricFieldProducer.Sum();
-        assertEquals(0d, metric.get(), 0);
-        metric.collect(5.5);
-        metric.collect(12.2);
-        metric.collect(55);
-        assertEquals(72.7, metric.get(), 0);
-        metric.reset();
-        assertEquals(0d, metric.get(), 0);
+    public void testSumCountMetric() throws IOException {
+        var instance = new MetricFieldProducer.GaugeMetricFieldProducer(randomAlphaOfLength(10));
+        assertEquals(0, instance.sum.value(), 0);
+        var docIdBuffer = IntArrayList.from(0, 1, 2);
+        var values = createValuesInstance(docIdBuffer, 5.5, 12.2, 55);
+        instance.collect(values, docIdBuffer);
+        assertEquals(72.7, instance.sum.value(), 0);
+        instance.reset();
+        assertEquals(0, instance.sum.value(), 0);
     }
 
     /**
      * Testing summation accuracy.
      * Tests stolen from SumAggregatorTests#testSummationAccuracy
      */
-    public void testSummationAccuracy() {
-        MetricFieldProducer.Metric metric = new MetricFieldProducer.Sum();
+    public void testSummationAccuracy() throws IOException {
+        var instance = new MetricFieldProducer.GaugeMetricFieldProducer(randomAlphaOfLength(10));
+        assertEquals(0, instance.sum.value(), 0);
+        var docIdBuffer = IntArrayList.from(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
         // Summing up a normal array and expect an accurate value
-        double[] values = new double[] { 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7 };
-        for (int i = 0; i < values.length; i++) {
-            metric.collect(values[i]);
-        }
-        assertEquals(15.3, metric.get(), Double.MIN_NORMAL);
+        var values = createValuesInstance(docIdBuffer, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7);
+        instance.collect(values, docIdBuffer);
+        assertEquals(15.3, instance.sum.value(), Double.MIN_NORMAL);
 
         // Summing up an array which contains NaN and infinities and expect a result same as naive summation
-        metric.reset();
+        instance.reset();
         int n = randomIntBetween(5, 10);
+        docIdBuffer = new IntArrayList(n);
+        double[] valueArray = new double[n];
         double sum = 0;
         for (int i = 0; i < n; i++) {
+            docIdBuffer.add(i);
             double d = frequently()
                 ? randomFrom(Double.NaN, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY)
                 : randomDoubleBetween(Double.MIN_VALUE, Double.MAX_VALUE, true);
+            valueArray[i] = d;
             sum += d;
-            metric.collect(d);
         }
-        assertEquals(sum, metric.get(), 1e-10);
+        values = createValuesInstance(docIdBuffer, valueArray);
+        instance.collect(values, docIdBuffer);
+        assertEquals(sum, instance.sum.value(), 1e-10);
 
         // Summing up some big double values and expect infinity result
-        metric.reset();
+        instance.reset();
         n = randomIntBetween(5, 10);
+        docIdBuffer = new IntArrayList(n);
+        valueArray = new double[n];
         for (int i = 0; i < n; i++) {
-            metric.collect(Double.MAX_VALUE);
+            docIdBuffer.add(i);
+            valueArray[i] = Double.MAX_VALUE;
         }
-        assertEquals(Double.POSITIVE_INFINITY, metric.get(), 0d);
+        values = createValuesInstance(docIdBuffer, valueArray);
+        instance.collect(values, docIdBuffer);
+        assertEquals(Double.POSITIVE_INFINITY, instance.sum.value(), 0d);
 
-        metric.reset();
+        instance.reset();
+        n = randomIntBetween(5, 10);
+        docIdBuffer = new IntArrayList(n);
+        valueArray = new double[n];
         for (int i = 0; i < n; i++) {
-            metric.collect(-Double.MAX_VALUE);
+            docIdBuffer.add(i);
+            valueArray[i] = -Double.MAX_VALUE;
         }
-        assertEquals(Double.NEGATIVE_INFINITY, metric.get(), 0d);
+        values = createValuesInstance(docIdBuffer, valueArray);
+        instance.collect(values, docIdBuffer);
+        assertEquals(Double.NEGATIVE_INFINITY, instance.sum.value(), 0d);
     }
 
-    public void testValueCountMetric() {
-        MetricFieldProducer.Metric metric = new MetricFieldProducer.ValueCount();
-        assertEquals(0L, metric.get(), 0d);
-        metric.collect(40);
-        metric.collect(30);
-        metric.collect(20);
-        assertEquals(3L, metric.get(), 0d);
-        metric.reset();
-        assertEquals(0L, metric.get(), 0d);
+    public void testValueCountMetric() throws IOException {
+        var instance = new MetricFieldProducer.GaugeMetricFieldProducer(randomAlphaOfLength(10));
+        assertEquals(0, instance.count);
+        var docIdBuffer = IntArrayList.from(0, 1, 2);
+        var values = createValuesInstance(docIdBuffer, 40, 30, 20);
+        instance.collect(values, docIdBuffer);
+        assertEquals(3L, instance.count);
+        instance.reset();
+        assertEquals(0, instance.count);
     }
 
-    public void testLastValueMetric() {
-        MetricFieldProducer.Metric metric = new MetricFieldProducer.LastValue();
-        assertEquals(Double.MIN_VALUE, metric.get(), 0);
-        metric.collect(40);
-        metric.collect(30);
-        metric.collect(20);
-        assertEquals(40, metric.get(), 0);
-        metric.reset();
-        assertEquals(Double.MIN_VALUE, metric.get(), 0);
+    public void testLastValueMetric() throws IOException {
+        var instance = new MetricFieldProducer.CounterMetricFieldProducer(randomAlphaOfLength(10));
+        assertEquals(Double.MIN_VALUE, instance.lastValue, 0);
+        var docIdBuffer = IntArrayList.from(0, 1, 2);
+        var values = createValuesInstance(docIdBuffer, 40, 30, 20);
+        instance.collect(values, docIdBuffer);
+        assertEquals(40, instance.lastValue, 0);
+        instance.reset();
+        assertEquals(Double.MIN_VALUE, instance.lastValue, 0);
     }
 
     public void testCounterMetricFieldProducer() throws IOException {
         final String field = "field";
         var producer = new MetricFieldProducer.CounterMetricFieldProducer(field);
         assertTrue(producer.isEmpty());
-        producer.collect(55.0);
-        producer.collect(12.2);
-        producer.collect(5.5);
+        var docIdBuffer = IntArrayList.from(0, 1, 2);
+        var valuesInstance = createValuesInstance(docIdBuffer, 55.0, 12.2, 5.5);
+
+        producer.collect(valuesInstance, docIdBuffer);
 
         assertFalse(producer.isEmpty());
-        Object o = producer.value();
-        assertEquals(55.0, o);
+        assertEquals(55.0, producer.lastValue, 0);
         assertEquals("field", producer.name());
 
         XContentBuilder builder = JsonXContent.contentBuilder().startObject();
@@ -136,17 +154,42 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
         final String field = "field";
         MetricFieldProducer producer = new MetricFieldProducer.GaugeMetricFieldProducer(field);
         assertTrue(producer.isEmpty());
-        producer.collect(55.0);
-        producer.collect(12.2);
-        producer.collect(5.5);
+        var docIdBuffer = IntArrayList.from(0, 1, 2);
+        var valuesInstance = createValuesInstance(docIdBuffer, 55.0, 12.2, 5.5);
+        producer.collect(valuesInstance, docIdBuffer);
 
         assertFalse(producer.isEmpty());
 
         XContentBuilder builder = JsonXContent.contentBuilder().startObject();
         producer.write(builder);
         builder.endObject();
-        assertEquals("{\"field\":{\"min\":5.5,\"max\":55.0,\"sum\":72.7,\"value_count\":3.0}}", Strings.toString(builder));
+        assertEquals("{\"field\":{\"min\":5.5,\"max\":55.0,\"sum\":72.7,\"value_count\":3}}", Strings.toString(builder));
 
         assertEquals(field, producer.name());
     }
+
+    static SortedNumericDoubleValues createValuesInstance(IntArrayList docIdBuffer, double... values) {
+        return new SortedNumericDoubleValues() {
+
+            final IntDoubleHashMap docIdToValue = IntDoubleHashMap.from(docIdBuffer.toArray(), values);
+
+            int currentDocId = -1;
+
+            @Override
+            public boolean advanceExact(int target) throws IOException {
+                currentDocId = target;
+                return docIdToValue.containsKey(target);
+            }
+
+            @Override
+            public double nextValue() throws IOException {
+                return docIdToValue.get(currentDocId);
+            }
+
+            @Override
+            public int docValueCount() {
+                return 1;
+            }
+        };
+    }
 }