Browse Source

[TSDB] Improve downsampling performance by removing map lookups (#92494)

This PR improves downsampling performance by removing all the hashmap lookups.

Also, it modifies metrics/label producers so that they extract the doc_values directly 
from the leaves. This allows for extra optimizations for cases such as labels/counters 
that do not extract doc_values unless they are consumed.

Finally, this PR separates loading and computing of aggregate_metric_double fields

Co-authored-by: Nik Everett <nik9000@gmail.com>
Christos Soulios 2 years ago
parent
commit
df8ccceb57

+ 6 - 17
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java

@@ -7,16 +7,16 @@
 
 package org.elasticsearch.xpack.downsample;
 
-import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.index.fielddata.FormattedDocValues;
 
 import java.io.IOException;
 
 /**
- * Base class for classes that read metric and label fields.
+ * Base class that reads fields from the source index and produces their downsampled values
  */
-abstract class AbstractRollupFieldProducer<T> {
+abstract class AbstractRollupFieldProducer implements RollupFieldSerializer {
 
-    protected final String name;
+    private final String name;
     protected boolean isEmpty;
 
     AbstractRollupFieldProducer(String name) {
@@ -24,14 +24,6 @@ abstract class AbstractRollupFieldProducer<T> {
         this.isEmpty = true;
     }
 
-    /**
-     * Collect a value for the field applying the specific subclass collection strategy.
-     *
-     * @param field the name of the field to collect
-     * @param value the value to collect.
-     */
-    public abstract void collect(String field, T value);
-
     /**
      * @return the name of the field.
      */
@@ -44,15 +36,12 @@ abstract class AbstractRollupFieldProducer<T> {
      */
     public abstract void reset();
 
-    /**
-     * Serialize the downsampled value of the field.
-     */
-    public abstract void write(XContentBuilder builder) throws IOException;
-
     /**
      * @return true if the field has not collected any value.
      */
     public boolean isEmpty() {
         return isEmpty;
     }
+
+    public abstract void collect(FormattedDocValues docValues, int docId) throws IOException;
 }

+ 65 - 0
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java

@@ -0,0 +1,65 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.downsample;
+
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Collection;
+
+public class AggregateMetricFieldSerializer implements RollupFieldSerializer {
+    private final Collection<AbstractRollupFieldProducer> producers;
+    private final String name;
+
+    /**
+     * @param name the name of the aggregate_metric_double field as it will be serialized
+     *             in the downsampled index
+     * @param producers a collection of {@link AbstractRollupFieldProducer} instances with the subfields
+     *                  of the aggregate_metric_double field.
+     */
+    public AggregateMetricFieldSerializer(String name, Collection<AbstractRollupFieldProducer> producers) {
+        this.name = name;
+        this.producers = producers;
+    }
+
+    @Override
+    public void write(XContentBuilder builder) throws IOException {
+        if (isEmpty()) {
+            return;
+        }
+
+        builder.startObject(name);
+        for (AbstractRollupFieldProducer rollupFieldProducer : producers) {
+            assert name.equals(rollupFieldProducer.name()) : "producer has a different name";
+            if (rollupFieldProducer.isEmpty() == false) {
+                if (rollupFieldProducer instanceof MetricFieldProducer metricFieldProducer) {
+                    for (MetricFieldProducer.Metric metric : metricFieldProducer.metrics()) {
+                        if (metric.get() != null) {
+                            builder.field(metric.name(), metric.get());
+                        }
+                    }
+                } else if (rollupFieldProducer instanceof LabelFieldProducer labelFieldProducer) {
+                    LabelFieldProducer.Label label = labelFieldProducer.label();
+                    if (label.get() != null) {
+                        builder.field(label.name(), label.get());
+                    }
+                }
+            }
+        }
+        builder.endObject();
+    }
+
+    private boolean isEmpty() {
+        for (AbstractRollupFieldProducer p : producers) {
+            if (p.isEmpty() == false) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

+ 65 - 0
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldValueFetcher.java

@@ -0,0 +1,65 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.downsample;
+
+import org.elasticsearch.index.fielddata.IndexFieldData;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper;
+import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType;
+
+import java.util.List;
+
+public class AggregateMetricFieldValueFetcher extends FieldValueFetcher {
+
+    private AggregateDoubleMetricFieldType aggMetricFieldType;
+
+    private final AbstractRollupFieldProducer rollupFieldProducer;
+
+    protected AggregateMetricFieldValueFetcher(
+        MappedFieldType fieldType,
+        AggregateDoubleMetricFieldType aggMetricFieldType,
+        IndexFieldData<?> fieldData
+    ) {
+        super(fieldType, fieldData);
+        this.aggMetricFieldType = aggMetricFieldType;
+        this.rollupFieldProducer = createRollupFieldProducer();
+    }
+
+    public AbstractRollupFieldProducer rollupFieldProducer() {
+        return rollupFieldProducer;
+    }
+
+    private AbstractRollupFieldProducer createRollupFieldProducer() {
+        AggregateDoubleMetricFieldMapper.Metric metric = null;
+        for (var e : aggMetricFieldType.getMetricFields().entrySet()) {
+            NumberFieldMapper.NumberFieldType metricSubField = e.getValue();
+            if (metricSubField.name().equals(name())) {
+                metric = e.getKey();
+                break;
+            }
+        }
+        assert metric != null : "Cannot resolve metric type for field " + name();
+
+        if (aggMetricFieldType.getMetricType() != null) {
+            // If the field is an aggregate_metric_double field, we should use the correct subfields
+            // for each aggregation. This is a rollup-of-rollup case
+            MetricFieldProducer.Metric metricOperation = switch (metric) {
+                case max -> new MetricFieldProducer.Max();
+                case min -> new MetricFieldProducer.Min();
+                case sum -> new MetricFieldProducer.Sum();
+                // To compute value_count summary, we must sum all field values
+                case value_count -> new MetricFieldProducer.Sum(AggregateDoubleMetricFieldMapper.Metric.value_count.name());
+            };
+            return new MetricFieldProducer.GaugeMetricFieldProducer(aggMetricFieldType.name(), List.of(metricOperation));
+        } else {
+            // If field is not a metric, we downsample it as a label
+            return new LabelFieldProducer.AggregateMetricFieldProducer.AggregateMetricFieldProducer(aggMetricFieldType.name(), metric);
+        }
+    }
+}

+ 27 - 27
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java

@@ -16,51 +16,59 @@ import org.elasticsearch.index.query.SearchExecutionContext;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper;
 
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.List;
 
 /**
  * Utility class used for fetching field values by reading field data
  */
 class FieldValueFetcher {
-    private final String name;
+
     private final MappedFieldType fieldType;
-    private final DocValueFormat format;
     private final IndexFieldData<?> fieldData;
+    private final AbstractRollupFieldProducer rollupFieldProducer;
 
-    protected FieldValueFetcher(String name, MappedFieldType fieldType, IndexFieldData<?> fieldData) {
-        this.name = name;
+    protected FieldValueFetcher(MappedFieldType fieldType, IndexFieldData<?> fieldData) {
         this.fieldType = fieldType;
-        this.format = fieldType.docValueFormat(null, null);
         this.fieldData = fieldData;
+        this.rollupFieldProducer = createRollupFieldProducer();
     }
 
     public String name() {
-        return name;
+        return fieldType().name();
     }
 
     public MappedFieldType fieldType() {
         return fieldType;
     }
 
-    public DocValueFormat format() {
-        return format;
+    public FormattedDocValues getLeaf(LeafReaderContext context) {
+        DocValueFormat format = fieldType.docValueFormat(null, null);
+        return fieldData.load(context).getFormattedValues(format);
     }
 
-    public IndexFieldData<?> fieldData() {
-        return fieldData;
+    public AbstractRollupFieldProducer rollupFieldProducer() {
+        return rollupFieldProducer;
     }
 
-    FormattedDocValues getLeaf(LeafReaderContext context) {
-        return fieldData.load(context).getFormattedValues(format);
+    private AbstractRollupFieldProducer createRollupFieldProducer() {
+        if (fieldType.getMetricType() != null) {
+            return switch (fieldType.getMetricType()) {
+                case gauge -> new MetricFieldProducer.GaugeMetricFieldProducer(name());
+                case counter -> new MetricFieldProducer.CounterMetricFieldProducer(name());
+            };
+        } else {
+            // If field is not a metric, we downsample it as a label
+            return new LabelFieldProducer.LabelLastValueFieldProducer(name());
+        }
     }
 
     /**
      * Retrieve field value fetchers for a list of fields.
      */
-    static Map<String, FieldValueFetcher> create(SearchExecutionContext context, String[] fields) {
-        Map<String, FieldValueFetcher> fetchers = new LinkedHashMap<>();
+    static List<FieldValueFetcher> create(SearchExecutionContext context, String[] fields) {
+        List<FieldValueFetcher> fetchers = new ArrayList<>();
         for (String field : fields) {
             MappedFieldType fieldType = context.getFieldType(field);
             assert fieldType != null : "Unknown field type for field: [" + field + "]";
@@ -71,24 +79,16 @@ class FieldValueFetcher {
                 for (NumberFieldMapper.NumberFieldType metricSubField : aggMetricFieldType.getMetricFields().values()) {
                     if (context.fieldExistsInIndex(metricSubField.name())) {
                         IndexFieldData<?> fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH);
-                        fetchers.put(metricSubField.name(), new FieldValueFetcher(metricSubField.name(), fieldType, fieldData));
+                        fetchers.add(new AggregateMetricFieldValueFetcher(metricSubField, aggMetricFieldType, fieldData));
                     }
                 }
             } else {
                 if (context.fieldExistsInIndex(field)) {
                     IndexFieldData<?> fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH);
-                    fetchers.put(field, new FieldValueFetcher(field, fieldType, fieldData));
+                    fetchers.add(new FieldValueFetcher(fieldType, fieldData));
                 }
             }
         }
-        return Collections.unmodifiableMap(fetchers);
-    }
-
-    static Map<String, FormattedDocValues> docValuesFetchers(LeafReaderContext ctx, Map<String, FieldValueFetcher> fieldValueFetchers) {
-        final Map<String, FormattedDocValues> docValuesFetchers = new LinkedHashMap<>(fieldValueFetchers.size());
-        for (FieldValueFetcher fetcher : fieldValueFetchers.values()) {
-            docValuesFetchers.put(fetcher.name(), fetcher.getLeaf(ctx));
-        }
-        return Collections.unmodifiableMap(docValuesFetchers);
+        return Collections.unmodifiableList(fetchers);
     }
 }

+ 45 - 111
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java

@@ -7,58 +7,25 @@
 
 package org.elasticsearch.xpack.downsample;
 
-import org.elasticsearch.index.mapper.MappedFieldType;
-import org.elasticsearch.index.mapper.NumberFieldMapper;
-import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.index.fielddata.FormattedDocValues;
 import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper;
+import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 /**
  * Class that produces values for a label field.
  */
-abstract class LabelFieldProducer extends AbstractRollupFieldProducer<Object> {
+abstract class LabelFieldProducer extends AbstractRollupFieldProducer {
 
-    private final Label label;
-
-    LabelFieldProducer(String name, Label label) {
+    LabelFieldProducer(String name) {
         super(name);
-        this.label = label;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    /** Collect the value of a raw field  */
-    @Override
-    public void collect(String field, Object value) {
-        label.collect(value);
-        isEmpty = false;
-    }
-
-    public Label label() {
-        return this.label;
     }
 
-    public void reset() {
-        label.reset();
-        isEmpty = true;
-    }
-
-    /**
-     * Return the downsampled value as computed after collecting all raw values.
-     * @return
-     */
-    public abstract Object value();
+    abstract Label label();
 
     abstract static class Label {
-        final String name;
+        private final String name;
 
         /**
          * Abstract class that defines how a label is downsampled.
@@ -94,14 +61,7 @@ abstract class LabelFieldProducer extends AbstractRollupFieldProducer<Object> {
         }
 
         LastValueLabel() {
-            super("last_value");
-        }
-
-        @Override
-        void collect(Object value) {
-            if (lastValue == null) {
-                lastValue = value;
-            }
+            this("last_value");
         }
 
         @Override
@@ -113,101 +73,75 @@ abstract class LabelFieldProducer extends AbstractRollupFieldProducer<Object> {
         void reset() {
             lastValue = null;
         }
+
+        void collect(Object value) {
+            if (lastValue == null) {
+                lastValue = value;
+            }
+        }
     }
 
     /**
      * {@link LabelFieldProducer} implementation for a last value label
      */
     static class LabelLastValueFieldProducer extends LabelFieldProducer {
+        private final LastValueLabel label;
+
+        LabelLastValueFieldProducer(String name, LastValueLabel label) {
+            super(name);
+            this.label = label;
+        }
 
         LabelLastValueFieldProducer(String name) {
-            super(name, new LastValueLabel());
+            this(name, new LastValueLabel());
         }
 
         @Override
-        public Object value() {
-            return label().get();
+        Label label() {
+            return label;
         }
 
         @Override
         public void write(XContentBuilder builder) throws IOException {
             if (isEmpty() == false) {
-                builder.field(name(), value());
+                builder.field(name(), label.get());
             }
         }
-    }
-
-    static class AggregateMetricFieldProducer extends LabelFieldProducer {
-
-        private Map<String, Label> labelsByField = new LinkedHashMap<>();
-
-        AggregateMetricFieldProducer(String name) {
-            super(name, null);
-        }
-
-        public void addLabel(String field, Label label) {
-            labelsByField.put(field, label);
-        }
-
-        @Override
-        public void collect(String field, Object value) {
-            labelsByField.get(field).collect(value);
-            isEmpty = false;
-        }
 
         @Override
-        public void write(XContentBuilder builder) throws IOException {
+        public void collect(FormattedDocValues docValues, int docId) throws IOException {
             if (isEmpty() == false) {
-                builder.startObject(name());
-                for (Label label : labels()) {
-                    if (label.get() != null) {
-                        builder.field(label.name(), label.get());
-                    }
-                }
-                builder.endObject();
+                return;
+            }
+            if (docValues.advanceExact(docId) == false) {
+                return;
             }
-        }
-
-        public Collection<Label> labels() {
-            return labelsByField.values();
-        }
 
-        @Override
-        public Object value() {
-            return labelsByField;
+            int docValuesCount = docValues.docValueCount();
+            assert docValuesCount > 0;
+            isEmpty = false;
+            if (docValuesCount == 1) {
+                label.collect(docValues.nextValue());
+            } else {
+                Object[] values = new Object[docValuesCount];
+                for (int i = 0; i < docValuesCount; i++) {
+                    values[i] = docValues.nextValue();
+                }
+                label.collect(values);
+            }
         }
 
         @Override
         public void reset() {
-            labels().forEach(Label::reset);
+            label.reset();
             isEmpty = true;
         }
     }
 
-    /**
-     * Create a collection of label field producers.
-     */
-    static Map<String, LabelFieldProducer> createLabelFieldProducers(SearchExecutionContext context, String[] labelFields) {
-        final Map<String, LabelFieldProducer> fields = new LinkedHashMap<>();
-        for (String field : labelFields) {
-            MappedFieldType fieldType = context.getFieldType(field);
-            assert fieldType != null : "Unknown field type for field: [" + field + "]";
-
-            if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) {
-                // If the field is an aggregate_metric_double field, we should use the correct subfields
-                // for each aggregation. This is a rollup-of-rollup case
-                AggregateMetricFieldProducer producer = new AggregateMetricFieldProducer(field);
-                for (var e : aggMetricFieldType.getMetricFields().entrySet()) {
-                    AggregateDoubleMetricFieldMapper.Metric metric = e.getKey();
-                    NumberFieldMapper.NumberFieldType metricSubField = e.getValue();
-                    producer.addLabel(metricSubField.name(), new LastValueLabel(metric.name()));
-                    fields.put(metricSubField.name(), producer);
-                }
-            } else {
-                LabelFieldProducer producer = new LabelLastValueFieldProducer(field);
-                fields.put(field, producer);
-            }
+    static class AggregateMetricFieldProducer extends LabelLastValueFieldProducer {
+
+        AggregateMetricFieldProducer(String name, Metric metric) {
+            super(name, new LastValueLabel(metric.name()));
         }
-        return Collections.unmodifiableMap(fields);
     }
 }

+ 28 - 93
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java

@@ -7,26 +7,21 @@
 
 package org.elasticsearch.xpack.downsample;
 
-import org.elasticsearch.index.mapper.MappedFieldType;
-import org.elasticsearch.index.mapper.NumberFieldMapper;
-import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.index.fielddata.FormattedDocValues;
 import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
 import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Class that collects all raw values for a metric field and computes its aggregate (downsampled)
  * values. Based on the supported metric types, the subclasses of this class compute values for
  * gauge and metric types.
  */
-abstract class MetricFieldProducer extends AbstractRollupFieldProducer<Number> {
+abstract class MetricFieldProducer extends AbstractRollupFieldProducer {
     /**
      * a list of metrics that will be computed for the field
      */
@@ -41,30 +36,35 @@ abstract class MetricFieldProducer extends AbstractRollupFieldProducer<Number> {
      * Reset all values collected for the field
      */
     public void reset() {
-        for (Metric metric : metrics()) {
-            metric.reset();
-        }
+        metrics().forEach(Metric::reset);
         isEmpty = true;
     }
 
-    public String name() {
-        return name;
-    }
-
     /** return the list of metrics that are computed for the field */
     public Collection<Metric> metrics() {
         return metrics;
     }
 
     /** Collect the value of a raw field and compute all downsampled metrics */
-    @Override
-    public void collect(String field, Number value) {
+    void collect(Number value) {
         for (MetricFieldProducer.Metric metric : metrics()) {
             metric.collect(value);
         }
         isEmpty = false;
     }
 
+    @Override
+    public void collect(FormattedDocValues docValues, int docId) throws IOException {
+        if (docValues.advanceExact(docId) == false) {
+            return;
+        }
+        int docValuesCount = docValues.docValueCount();
+        for (int i = 0; i < docValuesCount; i++) {
+            Number num = (Number) docValues.nextValue();
+            collect(num);
+        }
+    }
+
     abstract static class Metric {
         final String name;
 
@@ -212,7 +212,7 @@ abstract class MetricFieldProducer extends AbstractRollupFieldProducer<Number> {
         @Override
         void collect(Number value) {
             if (lastValue == null) {
-                lastValue = value.doubleValue();
+                lastValue = value;
             }
         }
 
@@ -236,6 +236,17 @@ abstract class MetricFieldProducer extends AbstractRollupFieldProducer<Number> {
             super(name, Collections.singletonList(new LastValue()));
         }
 
+        @Override
+        public void collect(FormattedDocValues docValues, int docId) throws IOException {
+            // Counter producers only collect the last_value. Since documents are
+            // collected by descending timestamp order, the producer should only
+            // process the first value for every tsid. So, it will only collect the
+            // field if no value has been set before.
+            if (isEmpty()) {
+                super.collect(docValues, docId);
+            }
+        }
+
         public Object value() {
             assert metrics().size() == 1 : "Single value producers must have only one metric";
             return metrics().iterator().next().get();
@@ -275,80 +286,4 @@ abstract class MetricFieldProducer extends AbstractRollupFieldProducer<Number> {
             }
         }
     }
-
-    static class AggregateMetricFieldProducer extends MetricFieldProducer {
-
-        private Map<String, Metric> metricsByField = new LinkedHashMap<>();
-
-        AggregateMetricFieldProducer(String name) {
-            super(name, Collections.emptyList());
-        }
-
-        public void addMetric(String field, Metric metric) {
-            metricsByField.put(field, metric);
-        }
-
-        @Override
-        public void collect(String field, Number value) {
-            metricsByField.get(field).collect(value);
-            isEmpty = false;
-        }
-
-        @Override
-        public void write(XContentBuilder builder) throws IOException {
-            if (isEmpty() == false) {
-                builder.startObject(name());
-                for (MetricFieldProducer.Metric metric : metrics()) {
-                    if (metric.get() != null) {
-                        builder.field(metric.name(), metric.get());
-                    }
-                }
-                builder.endObject();
-            }
-        }
-
-        @Override
-        public Collection<Metric> metrics() {
-            return metricsByField.values();
-        }
-    }
-
-    /**
-     * Create a collection of metric field producers based on the metric_type mapping parameter in the field
-     * mapping.
-     */
-    static Map<String, MetricFieldProducer> createMetricFieldProducers(SearchExecutionContext context, String[] metricFields) {
-        final Map<String, MetricFieldProducer> fields = new LinkedHashMap<>();
-        for (String field : metricFields) {
-            MappedFieldType fieldType = context.getFieldType(field);
-            assert fieldType != null : "Unknown field type for field: [" + field + "]";
-            assert fieldType.getMetricType() != null : "Unknown metric type for metric field: [" + field + "]";
-
-            if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) {
-                // If the field is an aggregate_metric_double field, we should use the correct subfields
-                // for each aggregation. This is a rollup-of-rollup case
-                AggregateMetricFieldProducer producer = new AggregateMetricFieldProducer(field);
-                for (var e : aggMetricFieldType.getMetricFields().entrySet()) {
-                    AggregateDoubleMetricFieldMapper.Metric metric = e.getKey();
-                    NumberFieldMapper.NumberFieldType metricSubField = e.getValue();
-                    Metric metricOperation = switch (metric) {
-                        case max -> new Max();
-                        case min -> new Min();
-                        case sum -> new Sum();
-                        // To compute value_count summary, we must sum all field values
-                        case value_count -> new Sum(AggregateDoubleMetricFieldMapper.Metric.value_count.name());
-                    };
-                    producer.addMetric(metricSubField.name(), metricOperation);
-                    fields.put(metricSubField.name(), producer);
-                }
-            } else {
-                MetricFieldProducer producer = switch (fieldType.getMetricType()) {
-                    case gauge -> new GaugeMetricFieldProducer(field);
-                    case counter -> new CounterMetricFieldProducer(field);
-                };
-                fields.put(field, producer);
-            }
-        }
-        return Collections.unmodifiableMap(fields);
-    }
 }

+ 20 - 0
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupFieldSerializer.java

@@ -0,0 +1,20 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.downsample;
+
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+
+public interface RollupFieldSerializer {
+
+    /**
+     * Serialize the downsampled value of the field.
+     */
+    void write(XContentBuilder builder) throws IOException;
+}

+ 50 - 80
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java

@@ -24,9 +24,9 @@ import org.elasticsearch.common.Rounding;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.ArrayUtils;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.fielddata.FormattedDocValues;
@@ -53,15 +53,15 @@ import org.elasticsearch.xpack.core.rollup.action.RollupShardTask;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static java.util.stream.Collectors.groupingBy;
 import static org.elasticsearch.core.Strings.format;
 
 /**
@@ -90,8 +90,7 @@ class RollupShardIndexer {
     private final String[] dimensionFields;
     private final String[] metricFields;
     private final String[] labelFields;
-    private final Map<String, FieldValueFetcher> fieldValueFetchers;
-
+    private final List<FieldValueFetcher> fieldValueFetchers;
     private final RollupShardTask task;
     private volatile boolean abort = false;
 
@@ -128,7 +127,11 @@ class RollupShardIndexer {
             this.timestampField = searchExecutionContext.getFieldType(DataStreamTimestampFieldMapper.DEFAULT_PATH);
             this.timestampFormat = timestampField.docValueFormat(null, null);
             this.rounding = config.createRounding();
-            this.fieldValueFetchers = FieldValueFetcher.create(searchExecutionContext, ArrayUtils.concat(metricFields, labelFields));
+
+            List<FieldValueFetcher> fetchers = new ArrayList<>(metricFields.length + labelFields.length);
+            fetchers.addAll(FieldValueFetcher.create(searchExecutionContext, metricFields));
+            fetchers.addAll(FieldValueFetcher.create(searchExecutionContext, labelFields));
+            this.fieldValueFetchers = Collections.unmodifiableList(fetchers);
             toClose = null;
         } finally {
             IOUtils.closeWhileHandlingException(toClose);
@@ -239,14 +242,18 @@ class RollupShardIndexer {
 
     private class TimeSeriesBucketCollector extends BucketCollector {
         private final BulkProcessor bulkProcessor;
+        private final RollupBucketBuilder rollupBucketBuilder;
         private long docsProcessed;
         private long bucketsCreated;
-        private final RollupBucketBuilder rollupBucketBuilder = new RollupBucketBuilder();
         long lastTimestamp = Long.MAX_VALUE;
         long lastHistoTimestamp = Long.MAX_VALUE;
 
         TimeSeriesBucketCollector(BulkProcessor bulkProcessor) {
             this.bulkProcessor = bulkProcessor;
+            List<AbstractRollupFieldProducer> rollupFieldProducers = fieldValueFetchers.stream()
+                .map(FieldValueFetcher::rollupFieldProducer)
+                .toList();
+            this.rollupBucketBuilder = new RollupBucketBuilder(rollupFieldProducers);
         }
 
         @Override
@@ -254,7 +261,11 @@ class RollupShardIndexer {
             final LeafReaderContext ctx = aggCtx.getLeafReaderContext();
             final DocCountProvider docCountProvider = new DocCountProvider();
             docCountProvider.setLeafReaderContext(ctx);
-            final Map<String, FormattedDocValues> docValuesFetchers = FieldValueFetcher.docValuesFetchers(ctx, fieldValueFetchers);
+
+            // For each field, return a tuple with the rollup field producer and the field value leaf
+            final List<Tuple<AbstractRollupFieldProducer, FormattedDocValues>> fieldValueTuples = fieldValueFetchers.stream()
+                .map(fetcher -> Tuple.tuple(fetcher.rollupFieldProducer(), fetcher.getLeaf(ctx)))
+                .toList();
 
             return new LeafBucketCollector() {
                 @Override
@@ -316,29 +327,16 @@ class RollupShardIndexer {
                         } else {
                             rollupBucketBuilder.resetTimestamp(lastHistoTimestamp);
                         }
-
                         bucketsCreated++;
                     }
 
                     final int docCount = docCountProvider.getDocCount(docId);
                     rollupBucketBuilder.collectDocCount(docCount);
-                    for (Map.Entry<String, FormattedDocValues> e : docValuesFetchers.entrySet()) {
-                        final String fieldName = e.getKey();
-                        final FormattedDocValues leafField = e.getValue();
-
-                        if (leafField.advanceExact(docId)) {
-                            rollupBucketBuilder.collect(fieldName, leafField.docValueCount(), docValueCount -> {
-                                final Object[] values = new Object[docValueCount];
-                                for (int i = 0; i < docValueCount; ++i) {
-                                    try {
-                                        values[i] = leafField.nextValue();
-                                    } catch (IOException ex) {
-                                        throw new ElasticsearchException("Failed to read values for field [" + fieldName + "]");
-                                    }
-                                }
-                                return values;
-                            });
-                        }
+                    // Iterate over all field values and collect the doc_values for this docId
+                    for (Tuple<AbstractRollupFieldProducer, FormattedDocValues> tuple : fieldValueTuples) {
+                        AbstractRollupFieldProducer rollupFieldProducer = tuple.v1();
+                        FormattedDocValues docValues = tuple.v2();
+                        rollupFieldProducer.collect(docValues, docId);
                     }
                     docsProcessed++;
                 }
@@ -386,12 +384,10 @@ class RollupShardIndexer {
         private int tsidOrd = -1;
         private long timestamp;
         private int docCount;
-        private final Map<String, MetricFieldProducer> metricFieldProducers;
-        private final Map<String, LabelFieldProducer> labelFieldProducers;
+        private final List<AbstractRollupFieldProducer> rollupFieldProducers;
 
-        RollupBucketBuilder() {
-            this.metricFieldProducers = MetricFieldProducer.createMetricFieldProducers(searchExecutionContext, metricFields);
-            this.labelFieldProducers = LabelFieldProducer.createLabelFieldProducers(searchExecutionContext, labelFields);
+        RollupBucketBuilder(List<AbstractRollupFieldProducer> rollupFieldProducers) {
+            this.rollupFieldProducers = rollupFieldProducers;
         }
 
         /**
@@ -409,8 +405,7 @@ class RollupShardIndexer {
         public RollupBucketBuilder resetTimestamp(long timestamp) {
             this.timestamp = timestamp;
             this.docCount = 0;
-            this.metricFieldProducers.values().forEach(MetricFieldProducer::reset);
-            this.labelFieldProducers.values().forEach(LabelFieldProducer::reset);
+            this.rollupFieldProducers.forEach(AbstractRollupFieldProducer::reset);
             if (logger.isTraceEnabled()) {
                 logger.trace(
                     "New bucket for _tsid: [{}], @timestamp: [{}]",
@@ -421,46 +416,6 @@ class RollupShardIndexer {
             return this;
         }
 
-        public void collect(final String field, int docValueCount, final Function<Integer, Object[]> fieldValues) {
-            final Object[] values = fieldValues.apply(docValueCount);
-            if (metricFieldProducers.containsKey(field)) {
-                // TODO: missing support for array metrics
-                collectMetric(field, values);
-            } else if (labelFieldProducers.containsKey(field)) {
-                if (values.length == 1) {
-                    collectLabel(field, values[0]);
-                } else {
-                    collectLabel(field, values);
-                }
-            } else {
-                throw new IllegalArgumentException(
-                    "Field '"
-                        + field
-                        + "' is not a label nor a metric, existing labels: [ "
-                        + String.join(",", labelFieldProducers.keySet())
-                        + "], existing metrics: ["
-                        + String.join(", ", metricFieldProducers.keySet())
-                        + "]"
-                );
-            }
-        }
-
-        private void collectLabel(final String field, final Object value) {
-            labelFieldProducers.get(field).collect(field, value);
-        }
-
-        private void collectMetric(final String field, final Object[] values) {
-            for (Object value : values) {
-                if (value instanceof Number number) {
-                    metricFieldProducers.get(field).collect(field, number);
-                } else {
-                    throw new IllegalArgumentException(
-                        "Expected numeric value for field '" + field + "' but got non numeric value: '" + value + "'"
-                    );
-                }
-            }
-        }
-
         public void collectDocCount(int docCount) {
             this.docCount += docCount;
         }
@@ -483,14 +438,28 @@ class RollupShardIndexer {
                 builder.field(e.getKey(), e.getValue());
             }
 
-            // Serialize all metric fields
-            for (var producer : new HashSet<>(metricFieldProducers.values())) {
-                producer.write(builder);
-            }
+            /*
+             * The rollup field producers for aggregate_metric_double all share the same name (this is
+             * the name they will be serialized in the target index). We group all field producers by
+             * name. If grouping yields multiple rollup field producers, we delegate serialization to
+             * the AggregateMetricFieldSerializer class.
+             */
+            List<RollupFieldSerializer> groupedProducers = rollupFieldProducers.stream()
+                .collect(groupingBy(AbstractRollupFieldProducer::name))
+                .entrySet()
+                .stream()
+                .map(e -> {
+                    if (e.getValue().size() == 1) {
+                        return e.getValue().get(0);
+                    } else {
+                        return new AggregateMetricFieldSerializer(e.getKey(), e.getValue());
+                    }
+                })
+                .toList();
 
-            // Serialize all label fields
-            for (var producer : new HashSet<>(labelFieldProducers.values())) {
-                producer.write(builder);
+            // Serialize fields
+            for (RollupFieldSerializer fieldProducer : groupedProducers) {
+                fieldProducer.write(builder);
             }
 
             builder.endObject();
@@ -516,5 +485,6 @@ class RollupShardIndexer {
         public boolean isEmpty() {
             return tsid() == null || timestamp() == 0 || docCount() == 0;
         }
+
     }
 }

+ 25 - 5
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/LabelFieldProducerTests.java

@@ -7,8 +7,11 @@
 
 package org.elasticsearch.xpack.downsample;
 
+import org.elasticsearch.index.fielddata.FormattedDocValues;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
 
+import java.io.IOException;
+
 public class LabelFieldProducerTests extends AggregatorTestCase {
 
     public void testLastValueKeywordLabel() {
@@ -61,16 +64,33 @@ public class LabelFieldProducerTests extends AggregatorTestCase {
         assertNull(label.get());
     }
 
-    public void testLabelFieldProducer() {
+    public void testLabelFieldProducer() throws IOException {
         final LabelFieldProducer producer = new LabelFieldProducer.LabelLastValueFieldProducer("dummy");
         assertTrue(producer.isEmpty());
         assertEquals("dummy", producer.name());
-        assertEquals("last_value", producer.label().name);
-        producer.collect("dummy", "aaaa");
+        assertEquals("last_value", producer.label().name());
+        FormattedDocValues docValues = new FormattedDocValues() {
+            @Override
+            public boolean advanceExact(int docId) {
+                return true;
+            }
+
+            @Override
+            public int docValueCount() {
+                return 1;
+            }
+
+            @Override
+            public Object nextValue() {
+                return "aaaa";
+            }
+        };
+        producer.collect(docValues, 1);
+        // producer.collect("dummy", "aaaa");
         assertFalse(producer.isEmpty());
-        assertEquals("aaaa", producer.value());
+        assertEquals("aaaa", producer.label().get());
         producer.reset();
         assertTrue(producer.isEmpty());
-        assertNull(producer.value());
+        assertNull(producer.label().get());
     }
 }

+ 11 - 87
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java

@@ -8,19 +8,11 @@
 package org.elasticsearch.xpack.downsample;
 
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.index.IndexSettings;
-import org.elasticsearch.index.mapper.MappedFieldType;
-import org.elasticsearch.index.mapper.NumberFieldMapper;
-import org.elasticsearch.index.mapper.TimeSeriesParams;
-import org.elasticsearch.index.query.SearchExecutionContext;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
 
 import java.io.IOException;
-import java.util.Map;
-
-import static java.util.Collections.emptyMap;
 
 public class MetricFieldProducerTests extends AggregatorTestCase {
 
@@ -69,7 +61,7 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
         for (int i = 0; i < values.length; i++) {
             metric.collect(values[i]);
         }
-        assertEquals(metric.get().doubleValue(), 15.3, Double.MIN_NORMAL);
+        assertEquals(15.3, metric.get().doubleValue(), Double.MIN_NORMAL);
 
         // Summing up an array which contains NaN and infinities and expect a result same as naive summation
         metric.reset();
@@ -82,7 +74,7 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
             sum += d;
             metric.collect(d);
         }
-        assertEquals(metric.get().doubleValue(), sum, 1e-10);
+        assertEquals(sum, metric.get().doubleValue(), 1e-10);
 
         // Summing up some big double values and expect infinity result
         metric.reset();
@@ -90,13 +82,13 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
         for (int i = 0; i < n; i++) {
             metric.collect(Double.MAX_VALUE);
         }
-        assertEquals(metric.get().doubleValue(), Double.POSITIVE_INFINITY, 0d);
+        assertEquals(Double.POSITIVE_INFINITY, metric.get().doubleValue(), 0d);
 
         metric.reset();
         for (int i = 0; i < n; i++) {
             metric.collect(-Double.MAX_VALUE);
         }
-        assertEquals(metric.get().doubleValue(), Double.NEGATIVE_INFINITY, 0d);
+        assertEquals(Double.NEGATIVE_INFINITY, metric.get().doubleValue(), 0d);
     }
 
     public void testValueCountMetric() {
@@ -116,7 +108,7 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
         metric.collect(40);
         metric.collect(30);
         metric.collect(20);
-        assertEquals(40.0, metric.get());
+        assertEquals(40, metric.get());
         metric.reset();
         assertNull(metric.get());
     }
@@ -125,9 +117,9 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
         final String field = "field";
         var producer = new MetricFieldProducer.CounterMetricFieldProducer(field);
         assertTrue(producer.isEmpty());
-        producer.collect(field, 55.0);
-        producer.collect(field, 12.2);
-        producer.collect(field, 5.5);
+        producer.collect(55.0);
+        producer.collect(12.2);
+        producer.collect(5.5);
 
         assertFalse(producer.isEmpty());
         Object o = producer.value();
@@ -144,9 +136,9 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
         final String field = "field";
         MetricFieldProducer producer = new MetricFieldProducer.GaugeMetricFieldProducer(field);
         assertTrue(producer.isEmpty());
-        producer.collect(field, 55.0);
-        producer.collect(field, 12.2);
-        producer.collect(field, 5.5);
+        producer.collect(55.0);
+        producer.collect(12.2);
+        producer.collect(5.5);
 
         assertFalse(producer.isEmpty());
 
@@ -157,72 +149,4 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
 
         assertEquals(field, producer.name());
     }
-
-    public void testBuildMetricProducers() {
-        final Map<String, MappedFieldType> provideMappedFieldType = Map.of(
-            "gauge_field",
-            new NumberFieldMapper.NumberFieldType(
-                "gauge_field",
-                NumberFieldMapper.NumberType.DOUBLE,
-                true,
-                true,
-                true,
-                true,
-                null,
-                emptyMap(),
-                null,
-                false,
-                TimeSeriesParams.MetricType.gauge
-            ),
-            "counter_field",
-            new NumberFieldMapper.NumberFieldType(
-                "counter_field",
-                NumberFieldMapper.NumberType.DOUBLE,
-                true,
-                true,
-                true,
-                true,
-                null,
-                emptyMap(),
-                null,
-                false,
-                TimeSeriesParams.MetricType.counter
-            )
-        );
-
-        IndexSettings settings = createIndexSettings();
-        SearchExecutionContext searchExecutionContext = new SearchExecutionContext(
-            0,
-            0,
-            settings,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            () -> 0L,
-            null,
-            null,
-            () -> true,
-            null,
-            emptyMap()
-        ) {
-            @Override
-            public MappedFieldType getFieldType(String name) {
-                return provideMappedFieldType.get(name);
-            }
-        };
-
-        Map<String, MetricFieldProducer> producers = MetricFieldProducer.createMetricFieldProducers(
-            searchExecutionContext,
-            new String[] { "gauge_field", "counter_field" }
-        );
-        assertTrue(producers.get("gauge_field") instanceof MetricFieldProducer.GaugeMetricFieldProducer);
-        assertTrue(producers.get("counter_field") instanceof MetricFieldProducer.CounterMetricFieldProducer);
-    }
 }