Browse Source

Change downsample's MetricFieldProducers (#124701)

Refactor MetricFieldProducer to use SortedNumericDoubleValues instead of FormattedDocValues, which saves unneeded conversations / casts.
Martijn van Groningen 7 months ago
parent
commit
81f33e4602

+ 1 - 3
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java

@@ -39,9 +39,7 @@ public class AggregateMetricFieldSerializer implements DownsampleFieldSerializer
             if (fieldProducer.isEmpty() == false) {
                 if (fieldProducer instanceof MetricFieldProducer metricFieldProducer) {
                     for (MetricFieldProducer.Metric metric : metricFieldProducer.metrics()) {
-                        if (metric.get() != null) {
-                            builder.field(metric.name(), metric.get());
-                        }
+                        builder.field(metric.name(), metric.get());
                     }
                 } else if (fieldProducer instanceof LabelFieldProducer labelFieldProducer) {
                     LabelFieldProducer.Label label = labelFieldProducer.label();

+ 44 - 12
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java

@@ -34,6 +34,7 @@ import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.fielddata.FormattedDocValues;
+import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.DocCountFieldMapper;
 import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
@@ -361,14 +362,30 @@ class DownsampleShardIndexer {
             docCountProvider.setLeafReaderContext(ctx);
 
             // For each field, return a tuple with the downsample field producer and the field value leaf
-            final AbstractDownsampleFieldProducer[] fieldProducers = new AbstractDownsampleFieldProducer[fieldValueFetchers.size()];
-            final FormattedDocValues[] formattedDocValues = new FormattedDocValues[fieldValueFetchers.size()];
-            for (int i = 0; i < fieldProducers.length; i++) {
-                fieldProducers[i] = fieldValueFetchers.get(i).fieldProducer();
-                formattedDocValues[i] = fieldValueFetchers.get(i).getLeaf(ctx);
+            final List<AbstractDownsampleFieldProducer> nonMetricProducers = new ArrayList<>();
+            final List<FormattedDocValues> formattedDocValues = new ArrayList<>();
+
+            final List<MetricFieldProducer> metricProducers = new ArrayList<>();
+            final List<SortedNumericDoubleValues> numericDocValues = new ArrayList<>();
+            for (var fieldValueFetcher : fieldValueFetchers) {
+                var fieldProducer = fieldValueFetcher.fieldProducer();
+                if (fieldProducer instanceof MetricFieldProducer metricFieldProducer) {
+                    metricProducers.add(metricFieldProducer);
+                    numericDocValues.add(fieldValueFetcher.getNumericLeaf(ctx));
+                } else {
+                    nonMetricProducers.add(fieldProducer);
+                    formattedDocValues.add(fieldValueFetcher.getLeaf(ctx));
+                }
             }
 
-            var leafBucketCollector = new LeafDownsampleCollector(aggCtx, docCountProvider, fieldProducers, formattedDocValues);
+            var leafBucketCollector = new LeafDownsampleCollector(
+                aggCtx,
+                docCountProvider,
+                nonMetricProducers.toArray(new AbstractDownsampleFieldProducer[0]),
+                formattedDocValues.toArray(new FormattedDocValues[0]),
+                metricProducers.toArray(new MetricFieldProducer[0]),
+                numericDocValues.toArray(new SortedNumericDoubleValues[0])
+            );
             leafBucketCollectors.add(leafBucketCollector);
             return leafBucketCollector;
         }
@@ -386,7 +403,10 @@ class DownsampleShardIndexer {
             final AggregationExecutionContext aggCtx;
             final DocCountProvider docCountProvider;
             final FormattedDocValues[] formattedDocValues;
-            final AbstractDownsampleFieldProducer[] fieldProducers;
+            final AbstractDownsampleFieldProducer[] nonMetricProducers;
+
+            final MetricFieldProducer[] metricProducers;
+            final SortedNumericDoubleValues[] numericDocValues;
 
             // Capture the first timestamp in order to determine which leaf collector's leafBulkCollection() is invoked first.
             long firstTimeStampForBulkCollection;
@@ -396,13 +416,20 @@ class DownsampleShardIndexer {
             LeafDownsampleCollector(
                 AggregationExecutionContext aggCtx,
                 DocCountProvider docCountProvider,
-                AbstractDownsampleFieldProducer[] fieldProducers,
-                FormattedDocValues[] formattedDocValues
+                AbstractDownsampleFieldProducer[] nonMetricProducers,
+                FormattedDocValues[] formattedDocValues,
+                MetricFieldProducer[] metricProducers,
+                SortedNumericDoubleValues[] numericDocValues
             ) {
+                assert nonMetricProducers.length == formattedDocValues.length;
+                assert metricProducers.length == numericDocValues.length;
+
                 this.aggCtx = aggCtx;
                 this.docCountProvider = docCountProvider;
-                this.fieldProducers = fieldProducers;
+                this.nonMetricProducers = nonMetricProducers;
                 this.formattedDocValues = formattedDocValues;
+                this.metricProducers = metricProducers;
+                this.numericDocValues = numericDocValues;
             }
 
             @Override
@@ -488,11 +515,16 @@ class DownsampleShardIndexer {
 
                 downsampleBucketBuilder.collectDocCount(docIdBuffer, docCountProvider);
                 // Iterate over all field values and collect the doc_values for this docId
-                for (int i = 0; i < fieldProducers.length; i++) {
-                    AbstractDownsampleFieldProducer fieldProducer = fieldProducers[i];
+                for (int i = 0; i < nonMetricProducers.length; i++) {
+                    AbstractDownsampleFieldProducer fieldProducer = nonMetricProducers[i];
                     FormattedDocValues docValues = formattedDocValues[i];
                     fieldProducer.collect(docValues, docIdBuffer);
                 }
+                for (int i = 0; i < metricProducers.length; i++) {
+                    MetricFieldProducer metricFieldProducer = metricProducers[i];
+                    SortedNumericDoubleValues numericDoubleValues = numericDocValues[i];
+                    metricFieldProducer.collect(numericDoubleValues, docIdBuffer);
+                }
 
                 docsProcessed += docIdBuffer.size();
                 task.setDocsProcessed(docsProcessed);

+ 7 - 0
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java

@@ -10,6 +10,8 @@ package org.elasticsearch.xpack.downsample;
 import org.apache.lucene.index.LeafReaderContext;
 import org.elasticsearch.index.fielddata.FormattedDocValues;
 import org.elasticsearch.index.fielddata.IndexFieldData;
+import org.elasticsearch.index.fielddata.LeafNumericFieldData;
+import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
 import org.elasticsearch.index.mapper.flattened.FlattenedFieldMapper;
@@ -50,6 +52,11 @@ class FieldValueFetcher {
         return fieldData.load(context).getFormattedValues(format);
     }
 
+    public SortedNumericDoubleValues getNumericLeaf(LeafReaderContext context) {
+        LeafNumericFieldData numericFieldData = (LeafNumericFieldData) fieldData.load(context);
+        return numericFieldData.getDoubleValues();
+    }
+
     public AbstractDownsampleFieldProducer fieldProducer() {
         return fieldProducer;
     }

+ 38 - 28
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.downsample;
 
 import org.apache.lucene.internal.hppc.IntArrayList;
 import org.elasticsearch.index.fielddata.FormattedDocValues;
+import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
 import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
 import org.elasticsearch.xcontent.XContentBuilder;
 
@@ -46,7 +47,7 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce
     }
 
     /** Collect the value of a raw field and compute all downsampled metrics */
-    void collect(Number value) {
+    void collect(double value) {
         for (MetricFieldProducer.Metric metric : metrics()) {
             metric.collect(value);
         }
@@ -55,6 +56,11 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce
 
     @Override
     public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException {
+        assert false : "MetricFieldProducer does not support formatted doc values";
+        throw new UnsupportedOperationException();
+    }
+
+    public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException {
         for (int i = 0; i < docIdBuffer.size(); i++) {
             int docId = docIdBuffer.get(i);
             if (docValues.advanceExact(docId) == false) {
@@ -62,7 +68,7 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce
             }
             int docValuesCount = docValues.docValueCount();
             for (int j = 0; j < docValuesCount; j++) {
-                Number num = (Number) docValues.nextValue();
+                double num = docValues.nextValue();
                 collect(num);
             }
         }
@@ -83,9 +89,9 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce
             return name;
         }
 
-        abstract void collect(Number number);
+        abstract void collect(double number);
 
-        abstract Number get();
+        abstract double get();
 
         abstract void reset();
     }
@@ -94,25 +100,27 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce
      * Metric implementation that computes the maximum of all values of a field
      */
     static final class Max extends Metric {
-        private Double max;
+        private static final double NO_VALUE = -Double.MAX_VALUE;
+
+        private double max = NO_VALUE;
 
         Max() {
             super("max");
         }
 
         @Override
-        void collect(Number value) {
-            this.max = max != null ? Math.max(value.doubleValue(), max) : value.doubleValue();
+        void collect(double value) {
+            this.max = Math.max(value, max);
         }
 
         @Override
-        Number get() {
+        double get() {
             return max;
         }
 
         @Override
         void reset() {
-            max = null;
+            max = NO_VALUE;
         }
     }
 
@@ -120,25 +128,27 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce
      * Metric implementation that computes the minimum of all values of a field
      */
     static final class Min extends Metric {
-        private Double min;
+        private static final double NO_VALUE = Double.MAX_VALUE;
+
+        private double min = NO_VALUE;
 
         Min() {
             super("min");
         }
 
         @Override
-        void collect(Number value) {
-            this.min = min != null ? Math.min(value.doubleValue(), min) : value.doubleValue();
+        void collect(double value) {
+            this.min = Math.min(value, min);
         }
 
         @Override
-        Number get() {
+        double get() {
             return min;
         }
 
         @Override
         void reset() {
-            min = null;
+            min = NO_VALUE;
         }
     }
 
@@ -157,12 +167,12 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce
         }
 
         @Override
-        void collect(Number value) {
-            kahanSummation.add(value.doubleValue());
+        void collect(double value) {
+            kahanSummation.add(value);
         }
 
         @Override
-        Number get() {
+        double get() {
             return kahanSummation.value();
         }
 
@@ -183,12 +193,12 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce
         }
 
         @Override
-        void collect(Number value) {
+        void collect(double value) {
             count++;
         }
 
         @Override
-        Number get() {
+        double get() {
             return count;
         }
 
@@ -206,27 +216,29 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce
      * ignoring everything else.
      */
     static final class LastValue extends Metric {
-        private Number lastValue;
+        private static final double NO_VALUE = Double.MIN_VALUE;
+
+        private double lastValue = NO_VALUE;
 
         LastValue() {
             super("last_value");
         }
 
         @Override
-        void collect(Number value) {
-            if (lastValue == null) {
+        void collect(double value) {
+            if (lastValue == Double.MIN_VALUE) {
                 lastValue = value;
             }
         }
 
         @Override
-        Number get() {
+        double get() {
             return lastValue;
         }
 
         @Override
         void reset() {
-            lastValue = null;
+            lastValue = NO_VALUE;
         }
     }
 
@@ -240,7 +252,7 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce
         }
 
         @Override
-        public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException {
+        public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffer) throws IOException {
             // Counter producers only collect the last_value. Since documents are
             // collected by descending timestamp order, the producer should only
             // process the first value for every tsid. So, it will only collect the
@@ -281,9 +293,7 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce
             if (isEmpty() == false) {
                 builder.startObject(name());
                 for (MetricFieldProducer.Metric metric : metrics()) {
-                    if (metric.get() != null) {
-                        builder.field(metric.name(), metric.get());
-                    }
+                    builder.field(metric.name(), metric.get());
                 }
                 builder.endObject();
             }

+ 20 - 20
x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java

@@ -18,36 +18,36 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
 
     public void testMinCountMetric() {
         MetricFieldProducer.Metric metric = new MetricFieldProducer.Min();
-        assertNull(metric.get());
+        assertEquals(Double.MAX_VALUE, metric.get(), 0);
         metric.collect(40);
         metric.collect(5.5);
         metric.collect(12.2);
         metric.collect(55);
-        assertEquals(5.5, metric.get());
+        assertEquals(5.5, metric.get(), 0);
         metric.reset();
-        assertNull(metric.get());
+        assertEquals(Double.MAX_VALUE, metric.get(), 0);
     }
 
     public void testMaxCountMetric() {
         MetricFieldProducer.Metric metric = new MetricFieldProducer.Max();
-        assertNull(metric.get());
+        assertEquals(-Double.MAX_VALUE, metric.get(), 0);
         metric.collect(5.5);
         metric.collect(12.2);
         metric.collect(55);
-        assertEquals(55d, metric.get());
+        assertEquals(55d, metric.get(), 0);
         metric.reset();
-        assertNull(metric.get());
+        assertEquals(-Double.MAX_VALUE, metric.get(), 0);
     }
 
     public void testSumCountMetric() {
         MetricFieldProducer.Metric metric = new MetricFieldProducer.Sum();
-        assertEquals(0d, metric.get());
+        assertEquals(0d, metric.get(), 0);
         metric.collect(5.5);
         metric.collect(12.2);
         metric.collect(55);
-        assertEquals(72.7, metric.get());
+        assertEquals(72.7, metric.get(), 0);
         metric.reset();
-        assertEquals(0d, metric.get());
+        assertEquals(0d, metric.get(), 0);
     }
 
     /**
@@ -61,7 +61,7 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
         for (int i = 0; i < values.length; i++) {
             metric.collect(values[i]);
         }
-        assertEquals(15.3, metric.get().doubleValue(), Double.MIN_NORMAL);
+        assertEquals(15.3, metric.get(), Double.MIN_NORMAL);
 
         // Summing up an array which contains NaN and infinities and expect a result same as naive summation
         metric.reset();
@@ -74,7 +74,7 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
             sum += d;
             metric.collect(d);
         }
-        assertEquals(sum, metric.get().doubleValue(), 1e-10);
+        assertEquals(sum, metric.get(), 1e-10);
 
         // Summing up some big double values and expect infinity result
         metric.reset();
@@ -82,35 +82,35 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
         for (int i = 0; i < n; i++) {
             metric.collect(Double.MAX_VALUE);
         }
-        assertEquals(Double.POSITIVE_INFINITY, metric.get().doubleValue(), 0d);
+        assertEquals(Double.POSITIVE_INFINITY, metric.get(), 0d);
 
         metric.reset();
         for (int i = 0; i < n; i++) {
             metric.collect(-Double.MAX_VALUE);
         }
-        assertEquals(Double.NEGATIVE_INFINITY, metric.get().doubleValue(), 0d);
+        assertEquals(Double.NEGATIVE_INFINITY, metric.get(), 0d);
     }
 
     public void testValueCountMetric() {
         MetricFieldProducer.Metric metric = new MetricFieldProducer.ValueCount();
-        assertEquals(0L, metric.get());
+        assertEquals(0L, metric.get(), 0d);
         metric.collect(40);
         metric.collect(30);
         metric.collect(20);
-        assertEquals(3L, metric.get());
+        assertEquals(3L, metric.get(), 0d);
         metric.reset();
-        assertEquals(0L, metric.get());
+        assertEquals(0L, metric.get(), 0d);
     }
 
     public void testLastValueMetric() {
         MetricFieldProducer.Metric metric = new MetricFieldProducer.LastValue();
-        assertNull(metric.get());
+        assertEquals(Double.MIN_VALUE, metric.get(), 0);
         metric.collect(40);
         metric.collect(30);
         metric.collect(20);
-        assertEquals(40, metric.get());
+        assertEquals(40, metric.get(), 0);
         metric.reset();
-        assertNull(metric.get());
+        assertEquals(Double.MIN_VALUE, metric.get(), 0);
     }
 
     public void testCounterMetricFieldProducer() throws IOException {
@@ -145,7 +145,7 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
         XContentBuilder builder = JsonXContent.contentBuilder().startObject();
         producer.write(builder);
         builder.endObject();
-        assertEquals("{\"field\":{\"min\":5.5,\"max\":55.0,\"sum\":72.7,\"value_count\":3}}", Strings.toString(builder));
+        assertEquals("{\"field\":{\"min\":5.5,\"max\":55.0,\"sum\":72.7,\"value_count\":3.0}}", Strings.toString(builder));
 
         assertEquals(field, producer.name());
     }