Bläddra i källkod

Implement logic for storing fields that are neither dimensions nor metrics (aka tags) (#87929)

For label fields (fields that are not dimensions nor metrics)
we just propagate the latest value into the rollup index, the
same that we do for a counter metric.
Salvatore Campagna 3 år sedan
förälder
incheckning
504b30c6f0

+ 23 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupIndexerAction.java

@@ -44,11 +44,19 @@ public class RollupIndexerAction extends ActionType<RollupIndexerAction.Response
         private RollupAction.Request rollupRequest;
         private String[] dimensionFields;
         private String[] metricFields;
-
-        public Request(RollupAction.Request rollupRequest, final String[] dimensionFields, final String[] metricFields) {
+        private String[] labelFields;
+
+        public Request(
+            RollupAction.Request rollupRequest,
+            final String[] dimensionFields,
+            final String[] metricFields,
+            final String[] labelFields
+        ) {
+            super(rollupRequest.indices());
             this.rollupRequest = rollupRequest;
             this.dimensionFields = dimensionFields;
             this.metricFields = metricFields;
+            this.labelFields = labelFields;
         }
 
         public Request() {}
@@ -58,6 +66,7 @@ public class RollupIndexerAction extends ActionType<RollupIndexerAction.Response
             this.rollupRequest = new RollupAction.Request(in);
             this.dimensionFields = in.readStringArray();
             this.metricFields = in.readStringArray();
+            this.labelFields = in.readStringArray();
         }
 
         @Override
@@ -82,6 +91,10 @@ public class RollupIndexerAction extends ActionType<RollupIndexerAction.Response
             return this.metricFields;
         }
 
+        public String[] getLabelFields() {
+            return labelFields;
+        }
+
         @Override
         public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
             return new RollupTask(id, type, action, parentTaskId, rollupRequest.getRollupIndex(), rollupRequest.getRollupConfig(), headers);
@@ -93,6 +106,7 @@ public class RollupIndexerAction extends ActionType<RollupIndexerAction.Response
             rollupRequest.writeTo(out);
             out.writeStringArray(dimensionFields);
             out.writeStringArray(metricFields);
+            out.writeStringArray(labelFields);
         }
 
         @Override
@@ -106,6 +120,7 @@ public class RollupIndexerAction extends ActionType<RollupIndexerAction.Response
             builder.field("rollup_request", rollupRequest);
             builder.array("dimension_fields", dimensionFields);
             builder.array("metric_fields", metricFields);
+            builder.array("label_fields", labelFields);
             builder.endObject();
             return builder;
         }
@@ -115,6 +130,7 @@ public class RollupIndexerAction extends ActionType<RollupIndexerAction.Response
             int result = rollupRequest.hashCode();
             result = 31 * result + Arrays.hashCode(dimensionFields);
             result = 31 * result + Arrays.hashCode(metricFields);
+            result = 31 * result + Arrays.hashCode(labelFields);
             return result;
         }
 
@@ -125,6 +141,7 @@ public class RollupIndexerAction extends ActionType<RollupIndexerAction.Response
             Request request = (Request) o;
             if (rollupRequest.equals(request.rollupRequest) == false) return false;
             if (Arrays.equals(dimensionFields, request.dimensionFields) == false) return false;
+            if (Arrays.equals(labelFields, request.labelFields) == false) return false;
             return Arrays.equals(metricFields, request.metricFields);
         }
     }
@@ -225,6 +242,10 @@ public class RollupIndexerAction extends ActionType<RollupIndexerAction.Response
             return request.getMetricFields();
         }
 
+        public String[] getLabelFields() {
+            return request.getLabelFields();
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);

+ 26 - 11
x-pack/plugin/rollup/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/rollup/10_basic.yml

@@ -1,7 +1,7 @@
 setup:
   - skip:
-      version: " - 8.2.99"
-      reason: tsdb indexing changed in 8.3.0
+      version: " - 8.3.99"
+      reason: "rollup: labels support added in 8.4.0"
 
   - do:
       indices.create:
@@ -32,8 +32,18 @@ setup:
                         time_series_dimension: true
                       name:
                         type: keyword
+                      created_at:
+                        type: date_nanos
+                      running:
+                        type: boolean
+                      number_of_containers:
+                        type: integer
                       ip:
                         type: ip
+                      tags:
+                        type: keyword
+                      values:
+                        type: integer
                       network:
                         properties:
                           tx:
@@ -48,21 +58,21 @@ setup:
         index: test
         body:
           - '{"index": {}}'
-          - '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
+          - '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}, "created_at": "2021-04-28T19:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}'
           - '{"index": {}}'
-          - '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}}'
+          - '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.26", "network": {"tx": 2005177954, "rx": 801479970}, "created_at": "2021-04-28T19:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}'
           - '{"index": {}}'
-          - '{"@timestamp": "2021-04-28T20:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}}'
+          - '{"@timestamp": "2021-04-28T20:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.41", "network": {"tx": 2006223737, "rx": 802337279}, "created_at": "2021-04-28T19:36:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west2"], "values": [4, 1, 2]}}}'
           - '{"index": {}}'
-          - '{"@timestamp": "2021-04-28T20:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}}'
+          - '{"@timestamp": "2021-04-28T20:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.22", "network": {"tx": 2012916202, "rx": 803685721}, "created_at": "2021-04-28T19:37:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 1]}}}'
           - '{"index": {}}'
-          - '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}}'
+          - '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.33", "network": {"tx": 1434521831, "rx": 530575198}, "created_at": "2021-04-28T19:42:00.000Z", "running": false, "number_of_containers": 1, "tags": ["backend", "test"], "values": [2, 3, 4]}}}'
           - '{"index": {}}'
-          - '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}}'
+          - '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.56", "network": {"tx": 1434577921, "rx": 530600088}, "created_at": "2021-04-28T19:43:00.000Z", "running": false, "number_of_containers": 1, "tags": ["backend", "test", "us-west2"], "values": [2, 1, 1]}}}'
           - '{"index": {}}'
-          - '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}}'
+          - '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.37", "network": {"tx": 1434587694, "rx": 530604797}, "created_at": "2021-04-28T19:44:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [4, 5, 2]}}}'
           - '{"index": {}}'
-          - '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}'
+          - '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.120", "network": {"tx": 1434595272, "rx": 530605511}, "created_at": "2021-04-28T19:45:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [3, 2, 1]}}}'
 
   - do:
       indices.put_settings:
@@ -99,7 +109,12 @@ setup:
   - match:  { hits.hits.0._source.k8s\.pod\.network\.tx.min: 2001818691 }
   - match:  { hits.hits.0._source.k8s\.pod\.network\.tx.max: 2005177954 }
   - match:  { hits.hits.0._source.k8s\.pod\.network\.tx.value_count: 2 }
-  - is_false: hits.hits.0._source.k8s\.pod\.ip   # k8s.pod.ip isn't a dimension and is not rolled up
+  - match:  { hits.hits.0._source.k8s\.pod\.ip: "10.10.55.26" }
+  - match:  { hits.hits.0._source.k8s\.pod\.created_at: "2021-04-28T19:35:00.000Z" }
+  - match:  { hits.hits.0._source.k8s\.pod\.number_of_containers: 2 }
+  - match:  { hits.hits.0._source.k8s\.pod\.tags: ["backend", "prod", "us-west1"] }
+  - match:  { hits.hits.0._source.k8s\.pod\.values: [1, 1, 3] }
+  - is_true: hits.hits.0._source.k8s\.pod\.running
 
   # Assert rollup index settings
   - do:

+ 52 - 0
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/AbstractRollupFieldProducer.java

@@ -0,0 +1,52 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.rollup.v2;
+
+/**
+ * Base class for classes that read metric and label fields.
+ */
+abstract class AbstractRollupFieldProducer<T> {
+
+    protected final String name;
+    protected boolean isEmpty;
+
+    AbstractRollupFieldProducer(String name) {
+        this.name = name;
+        this.isEmpty = true;
+    }
+
+    /**
+     * Collect a value for the field applying the specific subclass collection strategy.
+     * @param value the value to collect.
+     */
+    public abstract void collect(T value);
+
+    /**
+     * @return the name of the field.
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * @return the value of the field.
+     */
+    public abstract Object value();
+
+    /**
+     * Resets the collected value to the specific subclass reset value.
+     */
+    public abstract void reset();
+
+    /**
+     * @return true if the field has not collected any value.
+     */
+    public boolean isEmpty() {
+        return isEmpty;
+    }
+}

+ 28 - 10
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/FieldValueFetcher.java

@@ -18,9 +18,7 @@ import org.elasticsearch.search.DocValueFormat;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Function;
@@ -30,8 +28,20 @@ import java.util.function.Function;
  */
 class FieldValueFetcher {
 
-    private static final Set<Class<?>> VALID_TYPES = Collections.unmodifiableSet(
-        new HashSet<>(Arrays.asList(Long.class, Double.class, BigInteger.class, String.class, BytesRef.class))
+    private static final Set<Class<?>> VALID_METRIC_TYPES = Set.of(
+        Long.class,
+        Double.class,
+        BigInteger.class,
+        String.class,
+        BytesRef.class
+    );
+    private static final Set<Class<?>> VALID_LABEL_TYPES = Set.of(
+        Long.class,
+        Double.class,
+        BigInteger.class,
+        String.class,
+        BytesRef.class,
+        Boolean.class
     );
 
     private final String name;
@@ -66,7 +76,7 @@ class FieldValueFetcher {
 
     FormattedDocValues getLeaf(LeafReaderContext context) {
 
-        final FormattedDocValues delegate = fieldData.load(context).getFormattedValues(DocValueFormat.RAW);
+        final FormattedDocValues delegate = fieldData.load(context).getFormattedValues(format);
         return new FormattedDocValues() {
             @Override
             public boolean advanceExact(int docId) throws IOException {
@@ -102,7 +112,7 @@ class FieldValueFetcher {
     /**
      * Retrieve field fetchers for a list of fields.
      */
-    static List<FieldValueFetcher> build(SearchExecutionContext context, String[] fields) {
+    private static List<FieldValueFetcher> build(SearchExecutionContext context, String[] fields, Set<Class<?>> validTypes) {
         List<FieldValueFetcher> fetchers = new ArrayList<>(fields.length);
         for (String field : fields) {
             MappedFieldType fieldType = context.getFieldType(field);
@@ -110,20 +120,28 @@ class FieldValueFetcher {
                 throw new IllegalArgumentException("Unknown field: [" + field + "]");
             }
             IndexFieldData<?> fieldData = context.getForField(fieldType);
-            fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field)));
+            fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field, validTypes)));
         }
         return Collections.unmodifiableList(fetchers);
     }
 
-    static Function<Object, Object> getValidator(String field) {
+    static Function<Object, Object> getValidator(String field, Set<Class<?>> validTypes) {
         return value -> {
-            if (VALID_TYPES.contains(value.getClass()) == false) {
+            if (validTypes.contains(value.getClass()) == false) {
                 throw new IllegalArgumentException(
-                    "Expected [" + VALID_TYPES + "] for field [" + field + "], " + "got [" + value.getClass() + "]"
+                    "Expected [" + validTypes + "] for field [" + field + "], " + "got [" + value.getClass() + "]"
                 );
             }
             return value;
         };
     }
 
+    static List<FieldValueFetcher> forMetrics(SearchExecutionContext context, String[] metricFields) {
+        return build(context, metricFields, VALID_METRIC_TYPES);
+    }
+
+    static List<FieldValueFetcher> forLabels(SearchExecutionContext context, String[] labelFields) {
+        return build(context, labelFields, VALID_LABEL_TYPES);
+    }
+
 }

+ 130 - 0
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/LabelFieldProducer.java

@@ -0,0 +1,130 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.rollup.v2;
+
+import org.elasticsearch.index.query.SearchExecutionContext;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Class that produces values for a label field.
+ */
+abstract class LabelFieldProducer extends AbstractRollupFieldProducer<Object> {
+
+    private final Label label;
+
+    LabelFieldProducer(String name, Label label) {
+        super(name);
+        this.label = label;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    /** Collect the value of a raw field  */
+    @Override
+    public void collect(Object value) {
+        label.collect(value);
+        isEmpty = false;
+    }
+
+    public Label label() {
+        return this.label;
+    }
+
+    public void reset() {
+        label.reset();
+        isEmpty = true;
+    }
+
+    /**
+     * Return the downsampled value as computed after collecting all raw values.
+     * @return
+     */
+    public abstract Object value();
+
+    abstract static class Label {
+        final String name;
+
+        /**
+         * Abstract class that defines the how a label is computed.
+         * @param name
+         */
+        protected Label(String name) {
+            this.name = name;
+        }
+
+        abstract void collect(Object value);
+
+        abstract Object get();
+
+        abstract void reset();
+    }
+
+    /**
+     * Label implementation that stores the last value over time for a label. This implementation
+     * assumes that field values are collected sorted by descending order by time. In this case,
+     * it assumes that the last value of the time is the first value collected. Eventually,
+     * the implementation of this class end up storing the first value it is empty and then
+     * ignoring everything else.
+     */
+    static class LastValueLabel extends Label {
+        private Object lastValue;
+
+        LastValueLabel() {
+            super("last_value");
+        }
+
+        @Override
+        void collect(Object value) {
+            if (lastValue == null) {
+                lastValue = value;
+            }
+        }
+
+        @Override
+        Object get() {
+            return lastValue;
+        }
+
+        @Override
+        void reset() {
+            lastValue = null;
+        }
+    }
+
+    /**
+     * {@link LabelFieldProducer} implementation for a last value label
+     */
+    static class LabelLastValueFieldProducer extends LabelFieldProducer {
+
+        LabelLastValueFieldProducer(String name) {
+            super(name, new LastValueLabel());
+        }
+
+        @Override
+        public Object value() {
+            return label().get();
+        }
+    }
+
+    /**
+     * Produce a collection of label field producers.
+     */
+    static Map<String, LabelFieldProducer> buildLabelFieldProducers(SearchExecutionContext context, String[] labelFields) {
+        final Map<String, LabelFieldProducer> fields = new LinkedHashMap<>();
+        for (String field : labelFields) {
+            LabelFieldProducer producer = new LabelLastValueFieldProducer(field);
+            fields.put(field, producer);
+        }
+        return Collections.unmodifiableMap(fields);
+    }
+}

+ 23 - 28
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducer.java

@@ -22,32 +22,29 @@ import java.util.Map;
  * values. Based on the supported metric types, the subclasses of this class compute values for
  * gauge and metric types.
  */
-abstract class MetricFieldProducer {
-    private final String field;
-
+abstract class MetricFieldProducer extends AbstractRollupFieldProducer<Number> {
     /**
      * a list of metrics that will be computed for the field
      */
     private final List<Metric> metrics;
-    private boolean isEmpty = true;
 
-    MetricFieldProducer(String field, List<Metric> metrics) {
-        this.field = field;
+    MetricFieldProducer(String name, List<Metric> metrics) {
+        super(name);
         this.metrics = metrics;
     }
 
     /**
      * Reset all values collected for the field
      */
-    void reset() {
+    public void reset() {
         for (Metric metric : metrics) {
             metric.reset();
         }
         isEmpty = true;
     }
 
-    public String field() {
-        return field;
+    public String name() {
+        return name;
     }
 
     /** return the list of metrics that are computed for the field */
@@ -56,19 +53,17 @@ abstract class MetricFieldProducer {
     }
 
     /** Collect the value of a raw field and compute all downsampled metrics */
-    public void collectMetric(Double value) {
+    @Override
+    public void collect(Number value) {
         for (MetricFieldProducer.Metric metric : metrics) {
             metric.collect(value);
         }
         isEmpty = false;
     }
 
-    public boolean isEmpty() {
-        return isEmpty;
-    }
-
     /**
      * Return the downsampled value as computed after collecting all raw values.
+     * @return
      */
     public abstract Object value();
 
@@ -83,7 +78,7 @@ abstract class MetricFieldProducer {
             this.name = name;
         }
 
-        abstract void collect(double number);
+        abstract void collect(Number number);
 
         abstract Number get();
 
@@ -101,8 +96,8 @@ abstract class MetricFieldProducer {
         }
 
         @Override
-        void collect(double value) {
-            this.max = max != null ? Math.max(value, max) : value;
+        void collect(Number value) {
+            this.max = max != null ? Math.max(value.doubleValue(), max) : value.doubleValue();
         }
 
         @Override
@@ -127,8 +122,8 @@ abstract class MetricFieldProducer {
         }
 
         @Override
-        void collect(double value) {
-            this.min = min != null ? Math.min(value, min) : value;
+        void collect(Number value) {
+            this.min = min != null ? Math.min(value.doubleValue(), min) : value.doubleValue();
         }
 
         @Override
@@ -153,8 +148,8 @@ abstract class MetricFieldProducer {
         }
 
         @Override
-        void collect(double value) {
-            kahanSummation.add(value);
+        void collect(Number value) {
+            kahanSummation.add(value.doubleValue());
         }
 
         @Override
@@ -179,7 +174,7 @@ abstract class MetricFieldProducer {
         }
 
         @Override
-        void collect(double value) {
+        void collect(Number value) {
             count++;
         }
 
@@ -209,9 +204,9 @@ abstract class MetricFieldProducer {
         }
 
         @Override
-        void collect(double value) {
+        void collect(Number value) {
             if (lastValue == null) {
-                lastValue = value;
+                lastValue = value.doubleValue();
             }
         }
 
@@ -231,8 +226,8 @@ abstract class MetricFieldProducer {
      */
     static class CounterMetricFieldProducer extends MetricFieldProducer {
 
-        CounterMetricFieldProducer(String field) {
-            super(field, List.of(new LastValue()));
+        CounterMetricFieldProducer(String name) {
+            super(name, List.of(new LastValue()));
         }
 
         @Override
@@ -247,8 +242,8 @@ abstract class MetricFieldProducer {
      */
     static class GaugeMetricFieldProducer extends MetricFieldProducer {
 
-        GaugeMetricFieldProducer(String field) {
-            super(field, List.of(new Min(), new Max(), new Sum(), new ValueCount()));
+        GaugeMetricFieldProducer(String name) {
+            super(name, List.of(new Min(), new Max(), new Sum(), new ValueCount()));
         }
 
         @Override

+ 86 - 26
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java

@@ -24,6 +24,7 @@ import org.elasticsearch.common.Rounding;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexService;
@@ -54,7 +55,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.elasticsearch.core.Strings.format;
 
@@ -83,7 +86,9 @@ class RollupShardIndexer {
 
     private final String[] dimensionFields;
     private final String[] metricFields;
+    private final String[] labelFields;
     private final List<FieldValueFetcher> metricFieldFetchers;
+    private final List<FieldValueFetcher> labelFieldFetchers;
 
     private final AtomicLong numSent = new AtomicLong();
     private final AtomicLong numIndexed = new AtomicLong();
@@ -96,7 +101,8 @@ class RollupShardIndexer {
         String rollupIndex,
         RollupActionConfig config,
         String[] dimensionFields,
-        String[] metricFields
+        String[] metricFields,
+        String[] labelFields
     ) {
         this.client = client;
         this.indexShard = indexService.getShard(shardId.id());
@@ -104,6 +110,7 @@ class RollupShardIndexer {
         this.rollupIndex = rollupIndex;
         this.dimensionFields = dimensionFields;
         this.metricFields = metricFields;
+        this.labelFields = labelFields;
 
         this.searcher = indexShard.acquireSearcher("rollup");
         Closeable toClose = searcher;
@@ -119,7 +126,8 @@ class RollupShardIndexer {
             this.timestampField = searchExecutionContext.getFieldType(DataStreamTimestampFieldMapper.DEFAULT_PATH);
             this.timestampFormat = timestampField.docValueFormat(null, null);
             this.rounding = config.createRounding();
-            this.metricFieldFetchers = FieldValueFetcher.build(searchExecutionContext, metricFields);
+            this.metricFieldFetchers = FieldValueFetcher.forMetrics(searchExecutionContext, metricFields);
+            this.labelFieldFetchers = FieldValueFetcher.forLabels(searchExecutionContext, labelFields);
             toClose = null;
         } finally {
             IOUtils.closeWhileHandlingException(toClose);
@@ -224,8 +232,12 @@ class RollupShardIndexer {
             docCountProvider.setLeafReaderContext(ctx);
             final Map<String, FormattedDocValues> metricsFieldLeaves = new HashMap<>();
             for (FieldValueFetcher fetcher : metricFieldFetchers) {
-                FormattedDocValues leafField = fetcher.getLeaf(ctx);
-                metricsFieldLeaves.put(fetcher.name(), leafField);
+                metricsFieldLeaves.put(fetcher.name(), fetcher.getLeaf(ctx));
+            }
+
+            final Map<String, FormattedDocValues> labelFieldLeaves = new HashMap<>();
+            for (FieldValueFetcher fetcher : labelFieldFetchers) {
+                labelFieldLeaves.put(fetcher.name(), fetcher.getLeaf(ctx));
             }
 
             return new LeafBucketCollector() {
@@ -237,7 +249,10 @@ class RollupShardIndexer {
 
                     boolean tsidChanged = tsid.equals(rollupBucketBuilder.tsid()) == false;
                     if (tsidChanged || timestamp < lastHistoTimestamp) {
-                        lastHistoTimestamp = rounding.round(timestamp);
+                        lastHistoTimestamp = Math.max(
+                            rounding.round(timestamp),
+                            searchExecutionContext.getIndexSettings().getTimestampBounds().startTime()
+                        );
                     }
 
                     if (logger.isTraceEnabled()) {
@@ -289,23 +304,23 @@ class RollupShardIndexer {
 
                     final int docCount = docCountProvider.getDocCount(docId);
                     rollupBucketBuilder.collectDocCount(docCount);
-                    for (Map.Entry<String, FormattedDocValues> e : metricsFieldLeaves.entrySet()) {
-                        String fieldName = e.getKey();
-                        FormattedDocValues leafField = e.getValue();
+                    for (Map.Entry<String, FormattedDocValues> e : Sets.union(metricsFieldLeaves.entrySet(), labelFieldLeaves.entrySet())) {
+                        final String fieldName = e.getKey();
+                        final FormattedDocValues leafField = e.getValue();
 
                         if (leafField.advanceExact(docId)) {
-                            for (int i = 0; i < leafField.docValueCount(); i++) {
-                                // TODO: We should lazily load the doc_values for the metric.
-                                // In cases such as counter metrics we only need the first (latest_value)
-                                Object obj = leafField.nextValue();
-                                // TODO: Implement aggregate_metric_double for rollup of rollups
-                                if (obj instanceof Number number) {
-                                    // Collect docs to rollup doc
-                                    rollupBucketBuilder.collectMetric(fieldName, number.doubleValue());
-                                } else {
-                                    throw new IllegalArgumentException("Expected [Number], got [" + obj.getClass() + "]");
+                            rollupBucketBuilder.collect(fieldName, leafField.docValueCount(), docValueCount -> {
+                                final Object[] values = new Object[docValueCount];
+                                for (int i = 0; i < docValueCount; ++i) {
+                                    try {
+                                        values[i] = leafField.nextValue();
+                                    } catch (IOException ex) {
+                                        throw new ElasticsearchException("Failed to read values for field [" + fieldName + "]");
+                                    }
+
                                 }
-                            }
+                                return values;
+                            });
                         }
                     }
                     docsProcessed++;
@@ -347,9 +362,11 @@ class RollupShardIndexer {
         private long timestamp;
         private int docCount;
         private final Map<String, MetricFieldProducer> metricFieldProducers;
+        private final Map<String, LabelFieldProducer> labelFieldProducers;
 
         RollupBucketBuilder() {
             this.metricFieldProducers = MetricFieldProducer.buildMetricFieldProducers(searchExecutionContext, metricFields);
+            this.labelFieldProducers = LabelFieldProducer.buildLabelFieldProducers(searchExecutionContext, labelFields);
         }
 
         /**
@@ -366,7 +383,8 @@ class RollupShardIndexer {
         public RollupBucketBuilder resetTimestamp(long timestamp) {
             this.timestamp = timestamp;
             this.docCount = 0;
-            this.metricFieldProducers.values().stream().forEach(p -> p.reset());
+            this.metricFieldProducers.values().forEach(MetricFieldProducer::reset);
+            this.labelFieldProducers.values().forEach(LabelFieldProducer::reset);
             if (logger.isTraceEnabled()) {
                 logger.trace(
                     "New bucket for _tsid: [{}], @timestamp: [{}]",
@@ -374,12 +392,45 @@ class RollupShardIndexer {
                     timestampFormat.format(timestamp)
                 );
             }
-
             return this;
         }
 
-        public void collectMetric(String field, double value) {
-            metricFieldProducers.get(field).collectMetric(value);
+        public void collect(final String field, int docValueCount, final Function<Integer, Object[]> fieldValues) {
+            final Object[] value = fieldValues.apply(docValueCount);
+            if (metricFieldProducers.containsKey(field)) {
+                // TODO: missing support for array metrics
+                collectMetric(field, value[0]);
+            } else if (labelFieldProducers.containsKey(field)) {
+                if (value.length == 1) {
+                    collectLabel(field, value[0]);
+                } else {
+                    collectLabel(field, value);
+                }
+            } else {
+                throw new IllegalArgumentException(
+                    "Field '"
+                        + field
+                        + "' is not a label nor a metric, existing labels: [ "
+                        + String.join(",", labelFieldProducers.keySet())
+                        + "], existing metrics: ["
+                        + String.join(", ", metricFieldProducers.keySet())
+                        + "]"
+                );
+            }
+        }
+
+        private void collectLabel(final String field, final Object value) {
+            labelFieldProducers.get(field).collect(value);
+        }
+
+        private void collectMetric(final String field, final Object value) {
+            if (value instanceof Number number) {
+                metricFieldProducers.get(field).collect(number);
+            } else {
+                throw new IllegalArgumentException(
+                    "Expected numeric value for field '" + field + "' but got non numeric value: '" + value + "'"
+                );
+            }
         }
 
         public void collectDocCount(int docCount) {
@@ -394,7 +445,9 @@ class RollupShardIndexer {
             // Extract dimension values from _tsid field, so we avoid loading them from doc_values
             @SuppressWarnings("unchecked")
             Map<String, Object> dimensions = (Map<String, Object>) DocValueFormat.TIME_SERIES_ID.format(tsid);
-            Map<String, Object> doc = Maps.newLinkedHashMapWithExpectedSize(2 + dimensions.size() + metricFieldProducers.size());
+            Map<String, Object> doc = Maps.newLinkedHashMapWithExpectedSize(
+                2 + dimensions.size() + metricFieldProducers.size() + labelFieldProducers.size()
+            );
             doc.put(timestampField.name(), timestampFormat.format(timestamp));
             doc.put(DocCountFieldMapper.NAME, docCount);
 
@@ -403,9 +456,16 @@ class RollupShardIndexer {
                 doc.put(e.getKey(), e.getValue());
             }
 
-            for (MetricFieldProducer fieldProducer : metricFieldProducers.values()) {
+            for (AbstractRollupFieldProducer<?> fieldProducer : Stream.concat(
+                metricFieldProducers.values().stream(),
+                labelFieldProducers.values().stream()
+            ).toList()) {
                 if (fieldProducer.isEmpty() == false) {
-                    doc.put(fieldProducer.field(), fieldProducer.value());
+                    String field = fieldProducer.name();
+                    Object value = fieldProducer.value();
+                    if (value != null) {
+                        doc.put(field, value);
+                    }
                 }
             }
 

+ 71 - 0
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TimeseriesFieldTypeHelper.java

@@ -0,0 +1,71 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.rollup.v2;
+
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.mapper.TimeSeriesParams;
+import org.elasticsearch.indices.IndicesService;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM;
+import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM;
+
+class TimeseriesFieldTypeHelper {
+
+    private final MapperService mapperService;
+    private final String timestampField;
+
+    private TimeseriesFieldTypeHelper(final MapperService mapperService, final String timestampField) {
+        this.mapperService = mapperService;
+        this.timestampField = timestampField;
+    }
+
+    public boolean isTimeSeriesLabel(final String field, final Map<String, ?> unused) {
+        final MappedFieldType fieldType = mapperService.mappingLookup().getFieldType(field);
+        return fieldType != null
+            && (timestampField.equals(field) == false)
+            && (fieldType.isAggregatable())
+            && (fieldType.isDimension() == false)
+            && (mapperService.isMetadataField(field) == false);
+    }
+
+    public boolean isTimeSeriesMetric(final String unused, final Map<String, ?> fieldMapping) {
+        final String metricType = (String) fieldMapping.get(TIME_SERIES_METRIC_PARAM);
+        return metricType != null
+            && Arrays.asList(TimeSeriesParams.MetricType.values()).contains(TimeSeriesParams.MetricType.valueOf(metricType));
+    }
+
+    public boolean isTimeSeriesDimension(final String unused, final Map<String, ?> fieldMapping) {
+        return Boolean.TRUE.equals(fieldMapping.get(TIME_SERIES_DIMENSION_PARAM));
+    }
+
+    static class Builder {
+        private final IndicesService indicesService;
+        private final Map<String, Object> indexMapping;
+        private final IndexMetadata indexMetadata;
+
+        Builder(final IndicesService indicesService, final Map<String, Object> indexMapping, final IndexMetadata indexMetadata) {
+            this.indicesService = indicesService;
+            this.indexMapping = indexMapping;
+            this.indexMetadata = indexMetadata;
+        }
+
+        public TimeseriesFieldTypeHelper build(final String timestampField) throws IOException {
+            final MapperService mapperService = indicesService.createIndexMapperServiceForValidation(indexMetadata);
+            final CompressedXContent sourceIndexCompressedXContent = new CompressedXContent(indexMapping);
+            mapperService.merge(MapperService.SINGLE_MAPPING_NAME, sourceIndexCompressedXContent, MapperService.MergeReason.INDEX_TEMPLATE);
+            return new TimeseriesFieldTypeHelper(mapperService, timestampField);
+        }
+    }
+}

+ 126 - 91
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java

@@ -10,15 +10,15 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.stats.MappingVisitor;
 import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.fieldcaps.FieldCapabilities;
-import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
@@ -37,6 +37,7 @@ import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
@@ -47,7 +48,9 @@ import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.TimeSeriesParams;
+import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -63,10 +66,12 @@ import org.elasticsearch.xpack.core.rollup.action.RollupActionRequestValidationE
 import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM;
+
 /**
  * The master rollup action that coordinates
  *  -  creating the rollup index
@@ -78,6 +83,7 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
     private static final Logger logger = LogManager.getLogger(TransportRollupAction.class);
 
     private final Client client;
+    private final IndicesService indicesService;
     private final ClusterService clusterService;
     private final MetadataCreateIndexService metadataCreateIndexService;
     private final IndexScopedSettings indexScopedSettings;
@@ -104,6 +110,7 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
     @Inject
     public TransportRollupAction(
         Client client,
+        IndicesService indicesService,
         ClusterService clusterService,
         TransportService transportService,
         ThreadPool threadPool,
@@ -123,6 +130,7 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
             ThreadPool.Names.SAME
         );
         this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN);
+        this.indicesService = indicesService;
         this.clusterService = clusterService;
         this.metadataCreateIndexService = metadataCreateIndexService;
         this.indexScopedSettings = indexScopedSettings;
@@ -174,49 +182,53 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
         MetadataCreateIndexService.validateIndexName(rollupIndexName, state);
 
         // Rollup will perform the following tasks:
-        // 1. Extract rollup config from source index field caps
-        // 2. Create the rollup index
-        // 3. Run rollup indexer
-        // 4. Make rollup index read-only and set replicas
-        // 5. Refresh rollup index
-        // 6. Mark rollup index as "completed successfully"
-        // 7. Force-merge the rollup index to a single segment
+        // 1. Extract source index mappings
+        // 2. Extract rollup config from index mappings
+        // 3. Create the rollup index
+        // 4. Run rollup indexer
+        // 5. Make rollup index read-only and set replicas
+        // 6. Refresh rollup index
+        // 7. Mark rollup index as "completed successfully"
+        // 8. Force-merge the rollup index to a single segment
         // At any point if there is an issue, delete the rollup index
 
-        // 1. Extract rollup config from source index field caps
-        FieldCapabilitiesRequest fieldCapsRequest = new FieldCapabilitiesRequest().indices(sourceIndexName).fields("*");
+        // 1. Extract source index mappings
         final TaskId parentTask = new TaskId(clusterService.localNode().getId(), task.getId());
-        fieldCapsRequest.setParentTask(parentTask);
-        client.fieldCaps(fieldCapsRequest, ActionListener.wrap(fieldCapsResponse -> {
-            final Map<String, FieldCapabilities> dimensionFieldCaps = new HashMap<>();
-            final Map<String, FieldCapabilities> metricFieldCaps = new HashMap<>();
-            for (Map.Entry<String, Map<String, FieldCapabilities>> e : fieldCapsResponse.get().entrySet()) {
-                String field = e.getKey();
-                /*
-                 * Rollup runs on a single index, and we do not expect multiple mappings for the same
-                 * field. So, it is safe to select the first and only value of the FieldCapsResponse
-                 * by running: e.getValue().values().iterator().next()
-                 */
-                if (e.getValue().size() != 1) {
-                    throw new IllegalStateException(
-                        "Cannot parse mapping for field [" + field + "] at source index [" + sourceIndexName + "]"
-                    );
-                }
-                FieldCapabilities fieldCaps = e.getValue().values().iterator().next();
-                if (fieldCaps.isDimension()) {
-                    dimensionFieldCaps.put(field, fieldCaps);
-                } else if (e.getValue().values().iterator().next().getMetricType() != null) {
-                    metricFieldCaps.put(field, fieldCaps);
-                } else {
-                    // TODO: Field is not a dimension or a metric. Treat it as a tag
+        final GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(sourceIndexName);
+        getMappingsRequest.setParentTask(parentTask);
+        client.admin().indices().getMappings(getMappingsRequest, ActionListener.wrap(getMappingsResponse -> {
+            final Map<String, Object> sourceIndexMappings = getMappingsResponse.mappings()
+                .entrySet()
+                .stream()
+                .filter(entry -> sourceIndexName.equals(entry.getKey()))
+                .findFirst()
+                .map(mappingMetadata -> mappingMetadata.getValue().sourceAsMap())
+                .orElseThrow(() -> new IllegalArgumentException("No mapping found for rollup source index [" + sourceIndexName + "]"));
+
+            // 2. Extract rollup config from index mappings
+            final List<String> dimensionFields = new ArrayList<>();
+            final List<String> metricFields = new ArrayList<>();
+            final List<String> labelFields = new ArrayList<>();
+            final TimeseriesFieldTypeHelper helper = new TimeseriesFieldTypeHelper.Builder(
+                indicesService,
+                sourceIndexMappings,
+                sourceIndexMetadata
+            ).build(request.getRollupConfig().getTimestampField());
+            MappingVisitor.visitMapping(sourceIndexMappings, (field, mapping) -> {
+                if (helper.isTimeSeriesDimension(field, mapping)) {
+                    dimensionFields.add(field);
+                } else if (helper.isTimeSeriesMetric(field, mapping)) {
+                    metricFields.add(field);
+                } else if (helper.isTimeSeriesLabel(field, mapping)) {
+                    labelFields.add(field);
                 }
-            }
+            });
 
             RollupActionRequestValidationException validationException = new RollupActionRequestValidationException();
-            if (dimensionFieldCaps.isEmpty()) {
+            if (dimensionFields.isEmpty()) {
                 validationException.addValidationError("Index [" + sourceIndexName + "] does not contain any dimension fields");
             }
-            if (metricFieldCaps.isEmpty()) {
+            if (metricFields.isEmpty()) {
                 validationException.addValidationError("Index [" + sourceIndexName + "] does not contain any metric fields");
             }
 
@@ -225,22 +237,26 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
                 return;
             }
 
+            final MapperService mapperService = indicesService.createIndexMapperServiceForValidation(sourceIndexMetadata);
+            final CompressedXContent sourceIndexCompressedXContent = new CompressedXContent(sourceIndexMappings);
+            mapperService.merge(MapperService.SINGLE_MAPPING_NAME, sourceIndexCompressedXContent, MapperService.MergeReason.INDEX_TEMPLATE);
+
             final String mapping;
             try {
-                mapping = createRollupIndexMapping(request.getRollupConfig(), dimensionFieldCaps, metricFieldCaps);
+                mapping = createRollupIndexMapping(helper, request.getRollupConfig(), mapperService, sourceIndexMappings);
             } catch (IOException e) {
                 listener.onFailure(e);
                 return;
             }
-
-            // 2. Create rollup index
+            // 3. Create rollup index
             createRollupIndex(rollupIndexName, sourceIndexMetadata, mapping, request, ActionListener.wrap(createIndexResp -> {
                 if (createIndexResp.isAcknowledged()) {
                     // 3. Rollup index created. Run rollup indexer
                     RollupIndexerAction.Request rollupIndexerRequest = new RollupIndexerAction.Request(
                         request,
-                        dimensionFieldCaps.keySet().toArray(new String[0]),
-                        metricFieldCaps.keySet().toArray(new String[0])
+                        dimensionFields.toArray(new String[0]),
+                        metricFields.toArray(new String[0]),
+                        labelFields.toArray(new String[0])
                     );
                     rollupIndexerRequest.setParentTask(parentTask);
                     client.execute(RollupIndexerAction.INSTANCE, rollupIndexerRequest, ActionListener.wrap(indexerResp -> {
@@ -269,8 +285,8 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
                                                             mergeIndexResp -> listener.onResponse(AcknowledgedResponse.TRUE),
                                                             e -> {
                                                                 /*
-                                                                 * At this point rollup has been created successfully even if force-merge
-                                                                 * fails. So, we should not fail the rollup operation.
+                                                                 * At this point rollup has been created successfully even if
+                                                                 * force-merge fails. So, we should not fail the rollup operation.
                                                                  */
                                                                 logger.error(
                                                                     "Failed to force-merge rollup index [" + rollupIndexName + "]",
@@ -367,68 +383,87 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
      * rollup configuration.
      *
      * @param config the rollup configuration
-     * @param dimensionFieldCaps a map with the field name as key and the fields caps response as value
-     *                  for the dimension fields of the source index
-     * @param metricFieldCaps a map with the field name as key and the fields caps response as value
-     *                for the metric fields of the source index
-     *
+     * @param sourceIndexMappings a map with the source index mapping
      * @return the mapping of the rollup index
      */
     public static String createRollupIndexMapping(
+        final TimeseriesFieldTypeHelper helper,
         final RollupActionConfig config,
-        final Map<String, FieldCapabilities> dimensionFieldCaps,
-        final Map<String, FieldCapabilities> metricFieldCaps
+        final MapperService mapperService,
+        final Map<String, Object> sourceIndexMappings
     ) throws IOException {
-        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
-        builder = getDynamicTemplates(builder);
+        final XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+
+        addDynamicTemplates(builder);
 
         builder.startObject("properties");
 
-        String timestampField = config.getTimestampField();
-        String dateIntervalType = config.getIntervalType();
-        String dateInterval = config.getInterval().toString();
-        String tz = config.getTimeZone();
+        addTimestampField(config, builder);
+        addMetricFields(helper, sourceIndexMappings, builder);
+
+        builder.endObject(); // match initial startObject
+        builder.endObject(); // match startObject("properties")
+
+        final CompressedXContent rollupDiffXContent = CompressedXContent.fromJSON(
+            XContentHelper.convertToJson(BytesReference.bytes(builder), false, XContentType.JSON)
+        );
+        return mapperService.merge(MapperService.SINGLE_MAPPING_NAME, rollupDiffXContent, MapperService.MergeReason.INDEX_TEMPLATE)
+            .mappingSource()
+            .uncompressed()
+            .utf8ToString();
+    }
+
+    private static void addMetricFields(
+        final TimeseriesFieldTypeHelper helper,
+        final Map<String, Object> sourceIndexMappings,
+        final XContentBuilder builder
+    ) {
+        MappingVisitor.visitMapping(sourceIndexMappings, (field, mapping) -> {
+            if (helper.isTimeSeriesMetric(field, mapping)) {
+                try {
+                    addMetricFieldMapping(builder, field, mapping);
+                } catch (IOException e) {
+                    throw new ElasticsearchException("Error while adding metric for field [" + field + "]");
+                }
+            }
+        });
+    }
+
+    private static void addTimestampField(final RollupActionConfig config, final XContentBuilder builder) throws IOException {
+        final String timestampField = config.getTimestampField();
+        final String dateIntervalType = config.getIntervalType();
+        final String dateInterval = config.getInterval().toString();
+        final String timezone = config.getTimeZone();
 
         builder.startObject(timestampField)
             .field("type", DateFieldMapper.CONTENT_TYPE)
             .startObject("meta")
             .field(dateIntervalType, dateInterval)
-            .field(RollupActionConfig.TIME_ZONE, tz)
+            .field(RollupActionConfig.TIME_ZONE, timezone)
             .endObject()
             .endObject();
+    }
 
-        for (Map.Entry<String, FieldCapabilities> e : dimensionFieldCaps.entrySet()) {
-            builder.startObject(e.getKey())
-                .field("type", e.getValue().getType())
-                .field(TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM, true)
+    private static void addMetricFieldMapping(final XContentBuilder builder, final String field, final Map<String, ?> fieldProperties)
+        throws IOException {
+        final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.valueOf(
+            fieldProperties.get(TIME_SERIES_METRIC_PARAM).toString()
+        );
+        if (TimeSeriesParams.MetricType.counter.equals(metricType)) {
+            // For counters, we keep the same field type, because they store
+            // only one value (the last value of the counter)
+            builder.startObject(field).field("type", fieldProperties.get("type")).field(TIME_SERIES_METRIC_PARAM, metricType).endObject();
+        } else {
+            final List<String> supportedAggs = List.of(metricType.supportedAggs());
+            // We choose max as the default metric
+            final String defaultMetric = supportedAggs.contains("max") ? "max" : supportedAggs.get(0);
+            builder.startObject(field)
+                .field("type", AggregateDoubleMetricFieldMapper.CONTENT_TYPE)
+                .stringListField(AggregateDoubleMetricFieldMapper.Names.METRICS, supportedAggs)
+                .field(AggregateDoubleMetricFieldMapper.Names.DEFAULT_METRIC, defaultMetric)
+                .field(TIME_SERIES_METRIC_PARAM, metricType)
                 .endObject();
         }
-
-        for (Map.Entry<String, FieldCapabilities> e : metricFieldCaps.entrySet()) {
-            TimeSeriesParams.MetricType metricType = e.getValue().getMetricType();
-            if (metricType == TimeSeriesParams.MetricType.counter) {
-                // For counters we keep the same field type, because they store
-                // only one value (the last value of the counter)
-                builder.startObject(e.getKey())
-                    .field("type", e.getValue().getType())
-                    .field(TimeSeriesParams.TIME_SERIES_METRIC_PARAM, metricType)
-                    .endObject();
-            } else {
-                List<String> aggs = List.of(metricType.supportedAggs());
-                // We choose max as the default metric
-                String defaultMetric = aggs.contains("max") ? "max" : aggs.get(0);
-                builder.startObject(e.getKey())
-                    .field("type", AggregateDoubleMetricFieldMapper.CONTENT_TYPE)
-                    .stringListField(AggregateDoubleMetricFieldMapper.Names.METRICS, aggs)
-                    .field(AggregateDoubleMetricFieldMapper.Names.DEFAULT_METRIC, defaultMetric)
-                    .field(TimeSeriesParams.TIME_SERIES_METRIC_PARAM, metricType)
-                    .endObject();
-            }
-        }
-
-        builder.endObject();
-        builder.endObject();
-        return XContentHelper.convertToJson(BytesReference.bytes(builder), false, XContentType.JSON);
     }
 
     /**
@@ -473,8 +508,8 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
     /**
      * Configure the dynamic templates to always map strings to the keyword field type.
      */
-    private static XContentBuilder getDynamicTemplates(XContentBuilder builder) throws IOException {
-        return builder.startArray("dynamic_templates")
+    private static void addDynamicTemplates(final XContentBuilder builder) throws IOException {
+        builder.startArray("dynamic_templates")
             .startObject()
             .startObject("strings")
             .field("match_mapping_type", "string")

+ 2 - 1
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java

@@ -131,7 +131,8 @@ public class TransportRollupIndexerAction extends TransportBroadcastAction<
             request.getRollupIndex(),
             request.getRollupConfig(),
             request.getDimensionFields(),
-            request.getMetricFields()
+            request.getMetricFields(),
+            request.getLabelFields()
         );
         return indexer.execute();
     }

+ 76 - 0
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/LabelFieldProducerTests.java

@@ -0,0 +1,76 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.rollup.v2;
+
+import org.elasticsearch.search.aggregations.AggregatorTestCase;
+
+public class LabelFieldProducerTests extends AggregatorTestCase {
+
+    public void testLastValueKeywordLabel() {
+        final LabelFieldProducer.Label label = new LabelFieldProducer.LastValueLabel();
+        label.collect("aaa");
+        label.collect("bbb");
+        label.collect("ccc");
+        assertEquals("aaa", label.get());
+        label.reset();
+        assertNull(label.get());
+    }
+
+    public void testLastValueDoubleLabel() {
+        final LabelFieldProducer.Label label = new LabelFieldProducer.LastValueLabel();
+        label.collect(10.20D);
+        label.collect(17.30D);
+        label.collect(12.60D);
+        assertEquals(10.20D, label.get());
+        label.reset();
+        assertNull(label.get());
+    }
+
+    public void testLastValueIntegerLabel() {
+        final LabelFieldProducer.Label label = new LabelFieldProducer.LastValueLabel();
+        label.collect(10);
+        label.collect(17);
+        label.collect(12);
+        assertEquals(10, label.get());
+        label.reset();
+        assertNull(label.get());
+    }
+
+    public void testLastValueLongLabel() {
+        final LabelFieldProducer.Label label = new LabelFieldProducer.LastValueLabel();
+        label.collect(10L);
+        label.collect(17L);
+        label.collect(12L);
+        assertEquals(10L, label.get());
+        label.reset();
+        assertNull(label.get());
+    }
+
+    public void testLastValueBooleanLabel() {
+        final LabelFieldProducer.Label label = new LabelFieldProducer.LastValueLabel();
+        label.collect(true);
+        label.collect(false);
+        label.collect(true);
+        assertEquals(true, label.get());
+        label.reset();
+        assertNull(label.get());
+    }
+
+    public void testLabelFieldProducer() {
+        final LabelFieldProducer producer = new LabelFieldProducer.LabelLastValueFieldProducer("dummy");
+        assertTrue(producer.isEmpty());
+        assertEquals("dummy", producer.name());
+        assertEquals("last_value", producer.label().name);
+        producer.collect("aaaa");
+        assertFalse(producer.isEmpty());
+        assertEquals("aaaa", producer.value());
+        producer.reset();
+        assertTrue(producer.isEmpty());
+        assertNull(producer.value());
+    }
+}

+ 8 - 8
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducerTests.java

@@ -122,22 +122,22 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
     public void testCounterMetricFieldProducer() {
         MetricFieldProducer producer = new MetricFieldProducer.CounterMetricFieldProducer("field");
         assertTrue(producer.isEmpty());
-        producer.collectMetric(55.0);
-        producer.collectMetric(12.2);
-        producer.collectMetric(5.5);
+        producer.collect(55.0);
+        producer.collect(12.2);
+        producer.collect(5.5);
 
         assertFalse(producer.isEmpty());
         Object o = producer.value();
         assertEquals(55.0, o);
-        assertEquals("field", producer.field());
+        assertEquals("field", producer.name());
     }
 
     public void testGaugeMetricFieldProducer() {
         MetricFieldProducer producer = new MetricFieldProducer.GaugeMetricFieldProducer("field");
         assertTrue(producer.isEmpty());
-        producer.collectMetric(55.0);
-        producer.collectMetric(12.2);
-        producer.collectMetric(5.5);
+        producer.collect(55.0);
+        producer.collect(12.2);
+        producer.collect(5.5);
 
         assertFalse(producer.isEmpty());
         Object o = producer.value();
@@ -148,7 +148,7 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
         } else {
             fail("Value is not a Map");
         }
-        assertEquals("field", producer.field());
+        assertEquals("field", producer.name());
     }
 
     public void testBuildMetricProducers() {

+ 367 - 87
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java

@@ -6,13 +6,15 @@
  */
 package org.elasticsearch.xpack.rollup.v2;
 
-import org.apache.lucene.tests.util.LuceneTestCase;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.cluster.stats.MappingVisitor;
 import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
@@ -22,15 +24,15 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.datastreams.CreateDataStreamAction;
 import org.elasticsearch.action.datastreams.GetDataStreamAction;
-import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Template;
 import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.common.document.DocumentField;
+import org.elasticsearch.common.network.NetworkAddress;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.time.DateFormatter;
 import org.elasticsearch.datastreams.DataStreamsPlugin;
@@ -42,17 +44,26 @@ import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
 import org.elasticsearch.index.mapper.TimeSeriesParams;
+import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
-import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
-import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder;
-import org.elasticsearch.search.aggregations.bucket.composite.InternalComposite;
-import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
+import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
+import org.elasticsearch.search.aggregations.metrics.Max;
 import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
+import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
@@ -72,17 +83,22 @@ import org.junit.Before;
 import java.io.IOException;
 import java.time.Instant;
 import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
 
 public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
 
@@ -92,6 +108,18 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
     public static final String FIELD_DIMENSION_2 = "dimension_long";
     public static final String FIELD_NUMERIC_1 = "numeric_1";
     public static final String FIELD_NUMERIC_2 = "numeric_2";
+    public static final String FIELD_METRIC_LABEL_DOUBLE = "metric_label_double";
+    public static final String FIELD_LABEL_DOUBLE = "label_double";
+    public static final String FIELD_LABEL_INTEGER = "label_integer";
+    public static final String FIELD_LABEL_KEYWORD = "label_keyword";
+    public static final String FIELD_LABEL_TEXT = "label_text";
+    public static final String FIELD_LABEL_BOOLEAN = "label_boolean";
+    public static final String FIELD_LABEL_IPv4_ADDRESS = "label_ipv4_address";
+    public static final String FIELD_LABEL_IPv6_ADDRESS = "label_ipv6_address";
+    public static final String FIELD_LABEL_DATE = "label_date";
+    public static final String FIELD_LABEL_UNMAPPED = "label_unmapped";
+    public static final String FIELD_LABEL_KEYWORD_ARRAY = "label_keyword_array";
+    public static final String FIELD_LABEL_DOUBLE_ARRAY = "label_double_array";
 
     private static final int MAX_DIM_VALUES = 5;
     private static final long MAX_NUM_BUCKETS = 10;
@@ -128,6 +156,13 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
             dimensionValues.add(randomAlphaOfLength(6));
         }
 
+        /**
+         * NOTE: here we map each numeric label field also as a (counter) metric.
+         * This is done for testing purposes. There is no easy way to test
+         * that labels are collected using the last value. The idea is to
+         * check that the value of the label (last value) matches the value
+         * of the corresponding metric which uses a last_value metric type.
+         */
         client().admin()
             .indices()
             .prepareCreate(sourceIndex)
@@ -137,7 +172,10 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
                     .put("index.number_of_replicas", numOfReplicas)
                     .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
                     .putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), List.of(FIELD_DIMENSION_1))
-                    .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), Instant.ofEpochMilli(startTime).toString())
+                    .put(
+                        IndexSettings.TIME_SERIES_START_TIME.getKey(),
+                        DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(Instant.ofEpochMilli(startTime).toEpochMilli())
+                    )
                     .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), "2106-01-08T23:40:53.384Z")
                     .build()
             )
@@ -151,7 +189,29 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
                 FIELD_NUMERIC_1,
                 "type=long,time_series_metric=gauge",
                 FIELD_NUMERIC_2,
-                "type=double,time_series_metric=counter"
+                "type=double,time_series_metric=counter",
+                FIELD_LABEL_DOUBLE,
+                "type=double",
+                FIELD_LABEL_INTEGER,
+                "type=integer",
+                FIELD_LABEL_KEYWORD,
+                "type=keyword",
+                FIELD_LABEL_TEXT,
+                "type=text",
+                FIELD_LABEL_BOOLEAN,
+                "type=boolean",
+                FIELD_METRIC_LABEL_DOUBLE, /* numeric label indexed as a metric */
+                "type=double,time_series_metric=counter",
+                FIELD_LABEL_IPv4_ADDRESS,
+                "type=ip",
+                FIELD_LABEL_IPv6_ADDRESS,
+                "type=ip",
+                FIELD_LABEL_DATE,
+                "type=date,format=date_optional_time",
+                FIELD_LABEL_KEYWORD_ARRAY,
+                "type=keyword",
+                FIELD_LABEL_DOUBLE_ARRAY,
+                "type=double"
             )
             .get();
     }
@@ -160,6 +220,22 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
         RollupActionConfig config = new RollupActionConfig(randomInterval());
         SourceSupplier sourceSupplier = () -> {
             String ts = randomDateForInterval(config.getInterval());
+            double labelDoubleValue = DATE_FORMATTER.parseMillis(ts);
+            int labelIntegerValue = randomInt();
+            long labelLongValue = randomLong();
+            String labelIpv4Address = NetworkAddress.format(randomIp(true));
+            String labelIpv6Address = NetworkAddress.format(randomIp(false));
+            Date labelDateValue = randomDate();
+            int keywordArraySize = randomIntBetween(3, 10);
+            String[] keywordArray = new String[keywordArraySize];
+            for (int i = 0; i < keywordArraySize; ++i) {
+                keywordArray[i] = randomAlphaOfLength(10);
+            }
+            int doubleArraySize = randomIntBetween(3, 10);
+            double[] doubleArray = new double[doubleArraySize];
+            for (int i = 0; i < doubleArraySize; ++i) {
+                doubleArray[i] = randomDouble();
+            }
             return XContentFactory.jsonBuilder()
                 .startObject()
                 .field(FIELD_TIMESTAMP, ts)
@@ -167,6 +243,18 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
                 // .field(FIELD_DIMENSION_2, randomIntBetween(1, 10)) //TODO: Fix _tsid format issue and then enable this
                 .field(FIELD_NUMERIC_1, randomInt())
                 .field(FIELD_NUMERIC_2, DATE_FORMATTER.parseMillis(ts))
+                .field(FIELD_LABEL_DOUBLE, labelDoubleValue)
+                .field(FIELD_METRIC_LABEL_DOUBLE, labelDoubleValue)
+                .field(FIELD_LABEL_INTEGER, labelIntegerValue)
+                .field(FIELD_LABEL_KEYWORD, ts)
+                .field(FIELD_LABEL_UNMAPPED, randomBoolean() ? labelLongValue : labelDoubleValue)
+                .field(FIELD_LABEL_TEXT, ts)
+                .field(FIELD_LABEL_BOOLEAN, randomBoolean())
+                .field(FIELD_LABEL_IPv4_ADDRESS, labelIpv4Address)
+                .field(FIELD_LABEL_IPv6_ADDRESS, labelIpv6Address)
+                .field(FIELD_LABEL_DATE, labelDateValue)
+                .field(FIELD_LABEL_KEYWORD_ARRAY, keywordArray)
+                .field(FIELD_LABEL_DOUBLE_ARRAY, doubleArray)
                 .endObject();
         };
         bulkIndex(sourceSupplier);
@@ -175,6 +263,19 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
         assertRollupIndex(sourceIndex, rollupIndex, config);
     }
 
+    private Date randomDate() {
+        int randomYear = randomIntBetween(1970, 2020);
+        int randomMonth = randomIntBetween(1, 12);
+        int randomDayOfMonth = randomIntBetween(1, 28);
+        int randomHour = randomIntBetween(0, 23);
+        int randomMinute = randomIntBetween(0, 59);
+        int randomSecond = randomIntBetween(0, 59);
+        return Date.from(
+            ZonedDateTime.of(randomYear, randomMonth, randomDayOfMonth, randomHour, randomMinute, randomSecond, 0, ZoneOffset.UTC)
+                .toInstant()
+        );
+    }
+
     public void testCopyIndexSettings() throws IOException {
         Settings settings = Settings.builder()
             .put(LifecycleSettings.LIFECYCLE_NAME, randomAlphaOfLength(5))
@@ -233,7 +334,6 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
         assertThat(exception.getMessage(), containsString("rollup configuration is missing"));
     }
 
-    @LuceneTestCase.AwaitsFix(bugUrl = "TODO: Fix this")
     public void testRollupSparseMetrics() throws IOException {
         RollupActionConfig config = new RollupActionConfig(randomInterval());
         SourceSupplier sourceSupplier = () -> {
@@ -268,7 +368,7 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
         assertThat(exception.getMessage(), containsString(rollupIndex));
     }
 
-    public void testRollupEmptyIndex() {
+    public void testRollupEmptyIndex() throws IOException {
         RollupActionConfig config = new RollupActionConfig(randomInterval());
         // Source index has been created in the setup() method
         prepareSourceIndex(sourceIndex);
@@ -439,69 +539,180 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
         return response;
     }
 
+    private Aggregations aggregate(final String index, AggregationBuilder aggregationBuilder) {
+        return client().prepareSearch(index).addAggregation(aggregationBuilder).get().getAggregations();
+    }
+
     @SuppressWarnings("unchecked")
-    private void assertRollupIndex(String sourceIndex, String rollupIndex, RollupActionConfig config) {
+    private void assertRollupIndex(String sourceIndex, String rollupIndex, RollupActionConfig config) throws IOException {
         // Retrieve field information for the metric fields
-        FieldCapabilitiesResponse fieldCapsResponse = client().prepareFieldCaps(sourceIndex).setFields("*").get();
-        Map<String, TimeSeriesParams.MetricType> metricFields = fieldCapsResponse.get()
+        final GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings(sourceIndex).get();
+        final Map<String, Object> sourceIndexMappings = getMappingsResponse.mappings()
             .entrySet()
             .stream()
-            .filter(e -> e.getValue().values().iterator().next().getMetricType() != null)
-            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().values().iterator().next().getMetricType()));
-
-        final CompositeAggregationBuilder aggregation = buildCompositeAggs("resp", config, metricFields);
-        long numBuckets = 0;
-        InternalComposite origResp = client().prepareSearch(sourceIndex).addAggregation(aggregation).get().getAggregations().get("resp");
-        InternalComposite rollupResp = client().prepareSearch(rollupIndex).addAggregation(aggregation).get().getAggregations().get("resp");
-        while (origResp.afterKey() != null) {
-            numBuckets += origResp.getBuckets().size();
-            assertEquals(origResp, rollupResp);
-            aggregation.aggregateAfter(origResp.afterKey());
-            origResp = client().prepareSearch(sourceIndex).addAggregation(aggregation).get().getAggregations().get("resp");
-            rollupResp = client().prepareSearch(rollupIndex).addAggregation(aggregation).get().getAggregations().get("resp");
-        }
-        assertEquals(origResp, rollupResp);
+            .filter(entry -> sourceIndex.equals(entry.getKey()))
+            .findFirst()
+            .map(mappingMetadata -> mappingMetadata.getValue().sourceAsMap())
+            .orElseThrow(() -> new IllegalArgumentException("No mapping found for rollup source index [" + sourceIndex + "]"));
+
+        IndexMetadata indexMetadata = client().admin().cluster().prepareState().get().getState().getMetadata().index(sourceIndex);
+        TimeseriesFieldTypeHelper helper = new TimeseriesFieldTypeHelper.Builder(
+            getInstanceFromNode(IndicesService.class),
+            sourceIndexMappings,
+            indexMetadata
+        ).build(config.getTimestampField());
+        Map<String, TimeSeriesParams.MetricType> metricFields = new HashMap<>();
+        Map<String, String> labelFields = new HashMap<>();
+        MappingVisitor.visitMapping(sourceIndexMappings, (field, fieldMapping) -> {
+            if (helper.isTimeSeriesMetric(field, fieldMapping)) {
+                metricFields.put(field, TimeSeriesParams.MetricType.valueOf(fieldMapping.get(TIME_SERIES_METRIC_PARAM).toString()));
+            } else if (helper.isTimeSeriesLabel(field, fieldMapping)) {
+                labelFields.put(field, fieldMapping.get("type").toString());
+            }
+        });
 
-        SearchResponse resp = client().prepareSearch(rollupIndex).setTrackTotalHits(true).get();
-        assertThat(resp.getHits().getTotalHits().value, equalTo(numBuckets));
+        assertRollupIndexAggregations(sourceIndex, rollupIndex, config, metricFields, labelFields);
 
         GetIndexResponse indexSettingsResp = client().admin().indices().prepareGetIndex().addIndices(sourceIndex, rollupIndex).get();
-        // Assert rollup metadata are set in index settings
-        assertEquals("success", indexSettingsResp.getSetting(rollupIndex, "index.rollup.status"));
-        assertEquals(
-            indexSettingsResp.getSetting(sourceIndex, "index.uuid"),
-            indexSettingsResp.getSetting(rollupIndex, "index.rollup.source.uuid")
-        );
-        assertEquals(sourceIndex, indexSettingsResp.getSetting(rollupIndex, "index.rollup.source.name"));
-        assertEquals(indexSettingsResp.getSetting(sourceIndex, "index.mode"), indexSettingsResp.getSetting(rollupIndex, "index.mode"));
-        assertEquals(
-            indexSettingsResp.getSetting(sourceIndex, "time_series.start_time"),
-            indexSettingsResp.getSetting(rollupIndex, "time_series.start_time")
-        );
-        assertEquals(
-            indexSettingsResp.getSetting(sourceIndex, "time_series.end_time"),
-            indexSettingsResp.getSetting(rollupIndex, "time_series.end_time")
-        );
-        assertEquals(
-            indexSettingsResp.getSetting(sourceIndex, "index.routing_path"),
-            indexSettingsResp.getSetting(rollupIndex, "index.routing_path")
-        );
-        assertEquals(
-            indexSettingsResp.getSetting(sourceIndex, "index.number_of_shards"),
-            indexSettingsResp.getSetting(rollupIndex, "index.number_of_shards")
-        );
-        assertEquals(
-            indexSettingsResp.getSetting(sourceIndex, "index.number_of_replicas"),
-            indexSettingsResp.getSetting(rollupIndex, "index.number_of_replicas")
-        );
-        assertEquals("true", indexSettingsResp.getSetting(rollupIndex, "index.blocks.write"));
+        assertRollupIndexSettings(sourceIndex, rollupIndex, indexSettingsResp);
 
-        // Assert field mappings
         Map<String, Map<String, Object>> mappings = (Map<String, Map<String, Object>>) indexSettingsResp.getMappings()
             .get(rollupIndex)
             .getSourceAsMap()
             .get("properties");
 
+        assertFieldMappings(config, metricFields, mappings);
+
+        GetMappingsResponse indexMappings = client().admin()
+            .indices()
+            .getMappings(new GetMappingsRequest().indices(rollupIndex, sourceIndex))
+            .actionGet();
+        Map<String, String> rollupIndexProperties = (Map<String, String>) indexMappings.mappings()
+            .get(rollupIndex)
+            .sourceAsMap()
+            .get("properties");
+        Map<String, String> sourceIndexCloneProperties = (Map<String, String>) indexMappings.mappings()
+            .get(sourceIndex)
+            .sourceAsMap()
+            .get("properties");
+        List<Map.Entry<String, String>> labelFieldRollupIndexCloneProperties = (rollupIndexProperties.entrySet()
+            .stream()
+            .filter(entry -> labelFields.containsKey(entry.getKey()))
+            .toList());
+        List<Map.Entry<String, String>> labelFieldSourceIndexProperties = (sourceIndexCloneProperties.entrySet()
+            .stream()
+            .filter(entry -> labelFields.containsKey(entry.getKey()))
+            .toList());
+        assertEquals(labelFieldRollupIndexCloneProperties, labelFieldSourceIndexProperties);
+    }
+
+    private void assertRollupIndexAggregations(
+        String sourceIndex,
+        String rollupIndex,
+        RollupActionConfig config,
+        Map<String, TimeSeriesParams.MetricType> metricFields,
+        Map<String, String> labelFields
+    ) {
+        final AggregationBuilder aggregations = buildAggregations(config, metricFields, labelFields, config.getTimestampField());
+        Aggregations origResp = aggregate(sourceIndex, aggregations);
+        Aggregations rollupResp = aggregate(rollupIndex, aggregations);
+        assertEquals(origResp.asMap().keySet(), rollupResp.asMap().keySet());
+
+        StringTerms originalTsIdTermsAggregation = (StringTerms) origResp.getAsMap().values().stream().toList().get(0);
+        StringTerms rollupTsIdTermsAggregation = (StringTerms) rollupResp.getAsMap().values().stream().toList().get(0);
+        originalTsIdTermsAggregation.getBuckets().forEach(originalBucket -> {
+
+            StringTerms.Bucket rollupBucket = rollupTsIdTermsAggregation.getBucketByKey(originalBucket.getKeyAsString());
+            assertEquals(originalBucket.getAggregations().asList().size(), rollupBucket.getAggregations().asList().size());
+
+            InternalDateHistogram originalDateHistogram = (InternalDateHistogram) originalBucket.getAggregations().asList().get(0);
+            InternalDateHistogram rollupDateHistogram = (InternalDateHistogram) rollupBucket.getAggregations().asList().get(0);
+            List<InternalDateHistogram.Bucket> originalDateHistogramBuckets = originalDateHistogram.getBuckets();
+            List<InternalDateHistogram.Bucket> rollupDateHistogramBuckets = rollupDateHistogram.getBuckets();
+            assertEquals(originalDateHistogramBuckets.size(), rollupDateHistogramBuckets.size());
+            assertEquals(
+                originalDateHistogramBuckets.stream().map(InternalDateHistogram.Bucket::getKeyAsString).collect(Collectors.toList()),
+                rollupDateHistogramBuckets.stream().map(InternalDateHistogram.Bucket::getKeyAsString).collect(Collectors.toList())
+            );
+
+            for (int i = 0; i < originalDateHistogramBuckets.size(); ++i) {
+                InternalDateHistogram.Bucket originalDateHistogramBucket = originalDateHistogramBuckets.get(i);
+                InternalDateHistogram.Bucket rollupDateHistogramBucket = rollupDateHistogramBuckets.get(i);
+                assertEquals(originalDateHistogramBucket.getKeyAsString(), rollupDateHistogramBucket.getKeyAsString());
+
+                Aggregations originalAggregations = originalDateHistogramBucket.getAggregations();
+                Aggregations rollupAggregations = rollupDateHistogramBucket.getAggregations();
+                assertEquals(originalAggregations.asList().size(), rollupAggregations.asList().size());
+
+                List<Aggregation> nonTopHitsOriginalAggregations = originalAggregations.asList()
+                    .stream()
+                    .filter(agg -> agg.getType().equals("top_hits") == false)
+                    .toList();
+                List<Aggregation> nonTopHitsRollupAggregations = rollupAggregations.asList()
+                    .stream()
+                    .filter(agg -> agg.getType().equals("top_hits") == false)
+                    .toList();
+                assertEquals(nonTopHitsOriginalAggregations, nonTopHitsRollupAggregations);
+
+                List<Aggregation> topHitsOriginalAggregations = originalAggregations.asList()
+                    .stream()
+                    .filter(agg -> agg.getType().equals("top_hits"))
+                    .toList();
+                List<Aggregation> topHitsRollupAggregations = rollupAggregations.asList()
+                    .stream()
+                    .filter(agg -> agg.getType().equals("top_hits"))
+                    .toList();
+                assertEquals(topHitsRollupAggregations.size(), topHitsRollupAggregations.size());
+
+                for (int j = 0; j < topHitsRollupAggregations.size(); ++j) {
+                    InternalTopHits originalTopHits = (InternalTopHits) topHitsOriginalAggregations.get(j);
+                    InternalTopHits rollupTopHits = (InternalTopHits) topHitsRollupAggregations.get(j);
+                    SearchHit[] originalHits = originalTopHits.getHits().getHits();
+                    SearchHit[] rollupHits = rollupTopHits.getHits().getHits();
+                    assertEquals(originalHits.length, rollupHits.length);
+
+                    for (int k = 0; k < originalHits.length; ++k) {
+                        SearchHit originalHit = originalHits[k];
+                        SearchHit rollupHit = rollupHits[k];
+
+                        Map<String, DocumentField> originalHitDocumentFields = originalHit.getDocumentFields();
+                        Map<String, DocumentField> rollupHitDocumentFields = rollupHit.getDocumentFields();
+                        List<DocumentField> originalFields = originalHitDocumentFields.values().stream().toList();
+                        List<DocumentField> rollupFields = rollupHitDocumentFields.values().stream().toList();
+                        List<Object> originalFieldsList = originalFields.stream().flatMap(x -> x.getValues().stream()).toList();
+                        List<Object> rollupFieldsList = rollupFields.stream().flatMap(x -> x.getValues().stream()).toList();
+                        if (originalFieldsList.isEmpty() == false && rollupFieldsList.isEmpty() == false) {
+                            // NOTE: here we take advantage of the fact that a label field is indexed also as a metric of type
+                            // `counter`. This way we can actually check that the label value stored in the rollup index
+                            // is the last value (which is what we store for a metric of type counter) by comparing the metric
+                            // field value to the label field value.
+                            originalFieldsList.forEach(field -> assertTrue(rollupFieldsList.contains(field)));
+                            rollupFieldsList.forEach(field -> assertTrue(originalFieldsList.contains(field)));
+                            Object originalLabelValue = originalHit.getDocumentFields().values().stream().toList().get(0).getValue();
+                            Object rollupLabelValue = rollupHit.getDocumentFields().values().stream().toList().get(0).getValue();
+                            Optional<Aggregation> labelAsMetric = nonTopHitsOriginalAggregations.stream()
+                                .filter(agg -> agg.getName().equals("metric_" + rollupTopHits.getName()))
+                                .findFirst();
+                            // NOTE: this check is possible only if the label can be indexed as a metric (the label is a numeric field)
+                            if (labelAsMetric.isPresent()) {
+                                double metricValue = ((Max) labelAsMetric.get()).value();
+                                assertEquals(metricValue, rollupLabelValue);
+                                assertEquals(metricValue, originalLabelValue);
+                            }
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertFieldMappings(
+        RollupActionConfig config,
+        Map<String, TimeSeriesParams.MetricType> metricFields,
+        Map<String, Map<String, Object>> mappings
+    ) {
+        // Assert field mappings
         assertEquals(DateFieldMapper.CONTENT_TYPE, mappings.get(config.getTimestampField()).get("type"));
         Map<String, Object> dateTimeMeta = (Map<String, Object>) mappings.get(config.getTimestampField()).get("meta");
         assertEquals(config.getTimeZone(), dateTimeMeta.get("time_zone"));
@@ -517,38 +728,107 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
         });
     }
 
-    private CompositeAggregationBuilder buildCompositeAggs(
-        String name,
-        RollupActionConfig config,
-        Map<String, TimeSeriesParams.MetricType> metricFields
+    private void assertRollupIndexSettings(String sourceIndex, String rollupIndex, GetIndexResponse indexSettingsResp) {
+        // Assert rollup metadata are set in index settings
+        assertEquals("success", indexSettingsResp.getSetting(rollupIndex, IndexMetadata.INDEX_ROLLUP_STATUS_KEY));
+
+        assertNotNull(indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_INDEX_UUID));
+        assertNotNull(indexSettingsResp.getSetting(rollupIndex, IndexMetadata.INDEX_ROLLUP_SOURCE_UUID_KEY));
+        assertEquals(
+            indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_INDEX_UUID),
+            indexSettingsResp.getSetting(rollupIndex, IndexMetadata.INDEX_ROLLUP_SOURCE_UUID_KEY)
+        );
+
+        assertEquals(sourceIndex, indexSettingsResp.getSetting(rollupIndex, IndexMetadata.INDEX_ROLLUP_SOURCE_NAME_KEY));
+        assertEquals(indexSettingsResp.getSetting(sourceIndex, "index.mode"), indexSettingsResp.getSetting(rollupIndex, "index.mode"));
+
+        assertNotNull(indexSettingsResp.getSetting(sourceIndex, IndexSettings.TIME_SERIES_START_TIME.getKey()));
+        assertNotNull(indexSettingsResp.getSetting(rollupIndex, IndexSettings.TIME_SERIES_START_TIME.getKey()));
+        assertEquals(
+            indexSettingsResp.getSetting(sourceIndex, IndexSettings.TIME_SERIES_START_TIME.getKey()),
+            indexSettingsResp.getSetting(rollupIndex, IndexSettings.TIME_SERIES_START_TIME.getKey())
+        );
+
+        assertNotNull(indexSettingsResp.getSetting(sourceIndex, IndexSettings.TIME_SERIES_END_TIME.getKey()));
+        assertNotNull(indexSettingsResp.getSetting(rollupIndex, IndexSettings.TIME_SERIES_END_TIME.getKey()));
+        assertEquals(
+            indexSettingsResp.getSetting(sourceIndex, IndexSettings.TIME_SERIES_END_TIME.getKey()),
+            indexSettingsResp.getSetting(rollupIndex, IndexSettings.TIME_SERIES_END_TIME.getKey())
+        );
+        assertNotNull(indexSettingsResp.getSetting(sourceIndex, "index.routing_path"));
+        assertNotNull(indexSettingsResp.getSetting(rollupIndex, "index.routing_path"));
+        assertEquals(
+            indexSettingsResp.getSetting(sourceIndex, "index.routing_path"),
+            indexSettingsResp.getSetting(rollupIndex, "index.routing_path")
+        );
+
+        assertNotNull(indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS));
+        assertNotNull(indexSettingsResp.getSetting(rollupIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS));
+        assertEquals(
+            indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS),
+            indexSettingsResp.getSetting(rollupIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS)
+        );
+
+        assertNotNull(indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS));
+        assertNotNull(indexSettingsResp.getSetting(rollupIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS));
+        assertEquals(
+            indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS),
+            indexSettingsResp.getSetting(rollupIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS)
+        );
+        assertEquals("true", indexSettingsResp.getSetting(rollupIndex, "index.blocks.write"));
+    }
+
+    private AggregationBuilder buildAggregations(
+        final RollupActionConfig config,
+        final Map<String, TimeSeriesParams.MetricType> metrics,
+        final Map<String, String> labels,
+        final String timestampField
     ) {
-        List<CompositeValuesSourceBuilder<?>> sources = new ArrayList<>();
-        // For time series indices, we use the _tsid field for the terms aggregation
-        sources.add(new TermsValuesSourceBuilder("tsid").field(TimeSeriesIdFieldMapper.NAME));
 
-        DateHistogramValuesSourceBuilder dateHisto = new DateHistogramValuesSourceBuilder(config.getTimestampField());
-        dateHisto.field(config.getTimestampField());
+        final TermsAggregationBuilder tsidAggregation = new TermsAggregationBuilder("tsid").field(TimeSeriesIdFieldMapper.NAME)
+            .size(10_000);
+        final DateHistogramAggregationBuilder dateHistogramAggregation = new DateHistogramAggregationBuilder("timestamp").field(
+            config.getTimestampField()
+        ).fixedInterval(config.getInterval());
         if (config.getTimeZone() != null) {
-            dateHisto.timeZone(ZoneId.of(config.getTimeZone()));
+            dateHistogramAggregation.timeZone(ZoneId.of(config.getTimeZone()));
         }
-        dateHisto.fixedInterval(config.getInterval());
-        sources.add(dateHisto);
-
-        final CompositeAggregationBuilder composite = new CompositeAggregationBuilder(name, sources).size(10);
-        metricFields.forEach((fieldname, metricType) -> {
-            for (String agg : metricType.supportedAggs()) {
-                switch (agg) {
-                    case "min" -> composite.subAggregation(new MinAggregationBuilder(fieldname + "_" + agg).field(fieldname));
-                    case "max", "last_value" -> composite.subAggregation(new MaxAggregationBuilder(fieldname + "_" + agg).field(fieldname));
-                    case "sum" -> composite.subAggregation(new SumAggregationBuilder(fieldname + "_" + agg).field(fieldname));
-                    case "value_count" -> composite.subAggregation(
-                        new ValueCountAggregationBuilder(fieldname + "_" + agg).field(fieldname)
+        tsidAggregation.subAggregation(dateHistogramAggregation);
+
+        metrics.forEach((fieldName, metricType) -> {
+            for (final String supportedAggregation : metricType.supportedAggs()) {
+                switch (supportedAggregation) {
+                    case "min" -> dateHistogramAggregation.subAggregation(
+                        new MinAggregationBuilder(fieldName + "_" + supportedAggregation).field(fieldName)
+                    );
+                    case "max" -> dateHistogramAggregation.subAggregation(
+                        new MaxAggregationBuilder(fieldName + "_" + supportedAggregation).field(fieldName)
+                    );
+                    case "last_value" -> dateHistogramAggregation.subAggregation(
+                        new TopHitsAggregationBuilder(fieldName + "_" + supportedAggregation).sort(
+                            SortBuilders.fieldSort(timestampField).order(SortOrder.DESC)
+                        ).size(1).fetchField(fieldName)
                     );
-                    default -> throw new IllegalArgumentException("Unsupported metric type [" + agg + "]");
+                    case "sum" -> dateHistogramAggregation.subAggregation(
+                        new SumAggregationBuilder(fieldName + "_" + supportedAggregation).field(fieldName)
+                    );
+                    case "value_count" -> dateHistogramAggregation.subAggregation(
+                        new ValueCountAggregationBuilder(fieldName + "_" + supportedAggregation).field(fieldName)
+                    );
+                    default -> throw new IllegalArgumentException("Unsupported metric type [" + supportedAggregation + "]");
                 }
             }
         });
-        return composite;
+
+        labels.forEach((fieldName, type) -> {
+            dateHistogramAggregation.subAggregation(
+                new TopHitsAggregationBuilder(fieldName + "_last_value").sort(SortBuilders.fieldSort(timestampField).order(SortOrder.DESC))
+                    .size(1)
+                    .fetchField(fieldName)
+            );
+        });
+
+        return tsidAggregation;
     }
 
     @FunctionalInterface