Browse Source

OTLP: add support for counters and gauges (#133702)

Felix Barnsteiner 1 month ago
parent
commit
374b96b07b
13 changed files with 1217 additions and 1 deletions
  1. 13 1
      server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java
  2. 137 0
      x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java
  3. 316 0
      x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java
  4. 67 0
      x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java
  5. 67 0
      x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnel.java
  6. 34 0
      x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java
  7. 39 0
      x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java
  8. 39 0
      x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java
  9. 140 0
      x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java
  10. 177 0
      x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java
  11. 75 0
      x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java
  12. 54 0
      x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java
  13. 59 0
      x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java

+ 13 - 1
server/src/main/java/org/elasticsearch/cluster/routing/TsidBuilder.java

@@ -34,7 +34,15 @@ public class TsidBuilder {
     private static final int MAX_TSID_VALUE_FIELDS = 16;
     private final BufferedMurmur3Hasher murmur3Hasher = new BufferedMurmur3Hasher(0L);
 
-    private final List<Dimension> dimensions = new ArrayList<>();
+    private final List<Dimension> dimensions;
+
+    public TsidBuilder() {
+        this.dimensions = new ArrayList<>();
+    }
+
+    public TsidBuilder(int size) {
+        this.dimensions = new ArrayList<>(size);
+    }
 
     public static TsidBuilder newBuilder() {
         return new TsidBuilder();
@@ -281,6 +289,10 @@ public class TsidBuilder {
         return index;
     }
 
+    public int size() {
+        return dimensions.size();
+    }
+
     /**
      * A functional interface that describes how objects of a complex type are added to a TSID.
      *

+ 137 - 0
x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPoint.java

@@ -0,0 +1,137 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.oteldata.otlp.datapoint;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.util.List;
+import java.util.Set;
+
+import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
+
+/**
+ * Represents a metrics data point in the OpenTelemetry metrics data model.
+ * This interface defines methods to access various properties of a data point,
+ * such as its timestamp, attributes, unit, metric name, and methods to build
+ * the metric value in a specific format.
+ * The reason this class is needed is that the generated classes from the
+ * OpenTelemetry proto definitions don't implement a common interface,
+ * which makes it difficult to handle different types of data points uniformly.
+ */
+public interface DataPoint {
+
+    /**
+     * Returns the timestamp of the data point in Unix nanoseconds.
+     *
+     * @return the timestamp in nanoseconds
+     */
+    long getTimestampUnixNano();
+
+    /**
+     * Returns the start timestamp of the data point in Unix nanoseconds.
+     * This allows detecting when a sequence of  observations is unbroken.
+     * This field indicates to consumers the start time for points with cumulative and delta temporality,
+     * and can support correct rate calculation.
+     *
+     * @return the start timestamp in nanoseconds
+     */
+    long getStartTimestampUnixNano();
+
+    /**
+     * Returns the attributes associated with the data point.
+     *
+     * @return a list of key-value pairs representing the attributes
+     */
+    List<KeyValue> getAttributes();
+
+    /**
+     * Returns the unit of measurement for the data point.
+     *
+     * @return the unit as a string
+     */
+    String getUnit();
+
+    /**
+     * Returns the name of the metric associated with the data point.
+     *
+     * @return the metric name as a string
+     */
+    String getMetricName();
+
+    /**
+     * Returns the dynamic template name for the data point based on its type and value.
+     * This is used to dynamically map the appropriate field type according to the data point's characteristics.
+     *
+     * @return the dynamic template name as a string
+     */
+    String getDynamicTemplate();
+
+    /**
+     * Validates whether the data point can be indexed into Elasticsearch.
+     *
+     * @param errors a set to collect validation error messages
+     * @return true if the data point is valid, false otherwise
+     */
+    boolean isValid(Set<String> errors);
+
+    record Number(NumberDataPoint dataPoint, Metric metric) implements DataPoint {
+
+        @Override
+        public long getTimestampUnixNano() {
+            return dataPoint.getTimeUnixNano();
+        }
+
+        @Override
+        public List<KeyValue> getAttributes() {
+            return dataPoint.getAttributesList();
+        }
+
+        @Override
+        public long getStartTimestampUnixNano() {
+            return dataPoint.getStartTimeUnixNano();
+        }
+
+        @Override
+        public String getUnit() {
+            return metric.getUnit();
+        }
+
+        @Override
+        public String getMetricName() {
+            return metric.getName();
+        }
+
+        @Override
+        public String getDynamicTemplate() {
+            String type;
+            if (metric.hasSum()
+                // TODO add support for delta counters - for now we represent them as gauges
+                && metric.getSum().getAggregationTemporality() == AGGREGATION_TEMPORALITY_CUMULATIVE
+                // TODO add support for up/down counters - for now we represent them as gauges
+                && metric.getSum().getIsMonotonic()) {
+                type = "counter_";
+            } else {
+                type = "gauge_";
+            }
+            if (dataPoint.getValueCase() == NumberDataPoint.ValueCase.AS_INT) {
+                return type + "long";
+            } else if (dataPoint.getValueCase() == NumberDataPoint.ValueCase.AS_DOUBLE) {
+                return type + "double";
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public boolean isValid(Set<String> errors) {
+            return true;
+        }
+    }
+}

+ 316 - 0
x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java

@@ -0,0 +1,316 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.oteldata.otlp.datapoint;
+
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.common.v1.InstrumentationScope;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+import io.opentelemetry.proto.resource.v1.Resource;
+
+import com.google.protobuf.ByteString;
+
+import org.elasticsearch.cluster.routing.TsidBuilder;
+import org.elasticsearch.common.hash.BufferedMurmur3Hasher;
+import org.elasticsearch.common.hash.MurmurHash3.Hash128;
+import org.elasticsearch.core.CheckedConsumer;
+import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
+import org.elasticsearch.xpack.oteldata.otlp.tsid.DataPointTsidFunnel;
+import org.elasticsearch.xpack.oteldata.otlp.tsid.ResourceTsidFunnel;
+import org.elasticsearch.xpack.oteldata.otlp.tsid.ScopeTsidFunnel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+public class DataPointGroupingContext {
+
+    private final BufferedByteStringAccessor byteStringAccessor;
+    private final Map<Hash128, ResourceGroup> resourceGroups = new HashMap<>();
+    private final Set<String> ignoredDataPointMessages = new HashSet<>();
+
+    private int totalDataPoints = 0;
+    private int ignoredDataPoints = 0;
+
+    public DataPointGroupingContext(BufferedByteStringAccessor byteStringAccessor) {
+        this.byteStringAccessor = byteStringAccessor;
+    }
+
+    public void groupDataPoints(ExportMetricsServiceRequest exportMetricsServiceRequest) {
+        List<ResourceMetrics> resourceMetricsList = exportMetricsServiceRequest.getResourceMetricsList();
+        for (int i = 0; i < resourceMetricsList.size(); i++) {
+            ResourceMetrics resourceMetrics = resourceMetricsList.get(i);
+            ResourceGroup resourceGroup = getOrCreateResourceGroup(resourceMetrics);
+            List<ScopeMetrics> scopeMetricsList = resourceMetrics.getScopeMetricsList();
+            for (int j = 0; j < scopeMetricsList.size(); j++) {
+                ScopeMetrics scopeMetrics = scopeMetricsList.get(j);
+                ScopeGroup scopeGroup = resourceGroup.getOrCreateScope(scopeMetrics);
+                List<Metric> metricsList = scopeMetrics.getMetricsList();
+                for (int k = 0; k < metricsList.size(); k++) {
+                    var metric = metricsList.get(k);
+                    switch (metric.getDataCase()) {
+                        case SUM:
+                            scopeGroup.addDataPoints(metric, metric.getSum().getDataPointsList(), DataPoint.Number::new);
+                            break;
+                        case GAUGE:
+                            scopeGroup.addDataPoints(metric, metric.getGauge().getDataPointsList(), DataPoint.Number::new);
+                            break;
+                        case EXPONENTIAL_HISTOGRAM:
+                            ignoredDataPoints += metric.getExponentialHistogram().getDataPointsCount();
+                            ignoredDataPointMessages.add("Exponential histogram is not supported yet. Dropping " + metric.getName());
+                            break;
+                        case HISTOGRAM:
+                            ignoredDataPoints += metric.getHistogram().getDataPointsCount();
+                            ignoredDataPointMessages.add("Histogram is not supported yet. Dropping " + metric.getName());
+                            break;
+                        case SUMMARY:
+                            ignoredDataPoints += metric.getSummary().getDataPointsList().size();
+                            ignoredDataPointMessages.add("Summary is not supported yet. Dropping " + metric.getName());
+                            break;
+                        default:
+                            ignoredDataPoints++;
+                            ignoredDataPointMessages.add("unsupported metric type " + metric.getDataCase());
+                            break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Consumes all data point groups in the context, removing them from the context.
+     *
+     * @param consumer the consumer to process each {@link DataPointGroup}
+     * @param <E>      the type of exception that can be thrown by the consumer
+     * @throws E if the consumer throws an exception
+     */
+    public <E extends Exception> void consume(CheckedConsumer<DataPointGroup, E> consumer) throws E {
+        for (Iterator<ResourceGroup> iterator = resourceGroups.values().iterator(); iterator.hasNext();) {
+            ResourceGroup resourceGroup = iterator.next();
+            // Remove the resource group from the map can help to significantly reduce GC overhead.
+            // This avoids that the resource groups are promoted to survivor space when the context is kept alive for a while,
+            // for example, when referenced in the bulk response listener.
+            iterator.remove();
+            resourceGroup.forEach(consumer);
+        }
+    }
+
+    public int totalDataPoints() {
+        return totalDataPoints;
+    }
+
+    public int getIgnoredDataPoints() {
+        return ignoredDataPoints;
+    }
+
+    public String getIgnoredDataPointsMessage() {
+        return ignoredDataPointMessages.isEmpty() ? "" : String.join("\n", ignoredDataPointMessages);
+    }
+
+    private ResourceGroup getOrCreateResourceGroup(ResourceMetrics resourceMetrics) {
+        TsidBuilder resourceTsidBuilder = ResourceTsidFunnel.forResource(byteStringAccessor, resourceMetrics);
+        Hash128 resourceHash = resourceTsidBuilder.hash();
+        ResourceGroup resourceGroup = resourceGroups.get(resourceHash);
+        if (resourceGroup == null) {
+            resourceGroup = new ResourceGroup(resourceMetrics.getResource(), resourceMetrics.getSchemaUrlBytes());
+            resourceGroups.put(resourceHash, resourceGroup);
+        }
+        return resourceGroup;
+    }
+
+    class ResourceGroup {
+        private final Resource resource;
+        private final ByteString resourceSchemaUrl;
+        private final Map<Hash128, ScopeGroup> scopes;
+
+        ResourceGroup(Resource resource, ByteString resourceSchemaUrl) {
+            this.resource = resource;
+            this.resourceSchemaUrl = resourceSchemaUrl;
+            this.scopes = new HashMap<>();
+        }
+
+        public ScopeGroup getOrCreateScope(ScopeMetrics scopeMetrics) {
+            TsidBuilder scopeTsidBuilder = ScopeTsidFunnel.forScope(byteStringAccessor, scopeMetrics);
+            Hash128 scopeHash = scopeTsidBuilder.hash();
+            ScopeGroup scopeGroup = scopes.get(scopeHash);
+            if (scopeGroup == null) {
+                scopeGroup = new ScopeGroup(this, scopeMetrics.getScope(), scopeMetrics.getSchemaUrlBytes());
+                scopes.put(scopeHash, scopeGroup);
+            }
+            return scopeGroup;
+        }
+
+        public <E extends Exception> void forEach(CheckedConsumer<DataPointGroup, E> consumer) throws E {
+            for (ScopeGroup scopeGroup : scopes.values()) {
+                scopeGroup.forEach(consumer);
+            }
+        }
+    }
+
+    class ScopeGroup {
+        private final ResourceGroup resourceGroup;
+        private final InstrumentationScope scope;
+        private final ByteString scopeSchemaUrl;
+        // index -> timestamp -> dataPointGroupHash -> DataPointGroup
+        private final Map<String, Map<Hash128, Map<Hash128, DataPointGroup>>> dataPointGroupsByIndexAndTimestamp;
+
+        ScopeGroup(ResourceGroup resourceGroup, InstrumentationScope scope, ByteString scopeSchemaUrl) {
+            this.resourceGroup = resourceGroup;
+            this.scope = scope;
+            this.scopeSchemaUrl = scopeSchemaUrl;
+            this.dataPointGroupsByIndexAndTimestamp = new HashMap<>();
+        }
+
+        public <T> void addDataPoints(Metric metric, List<T> dataPoints, BiFunction<T, Metric, DataPoint> createDataPoint) {
+            for (int i = 0; i < dataPoints.size(); i++) {
+                T dataPoint = dataPoints.get(i);
+                addDataPoint(createDataPoint.apply(dataPoint, metric));
+            }
+        }
+
+        public void addDataPoint(DataPoint dataPoint) {
+            totalDataPoints++;
+            if (dataPoint.isValid(ignoredDataPointMessages) == false) {
+                ignoredDataPoints++;
+                return;
+            }
+            getOrCreateDataPointGroup(dataPoint).addDataPoint(dataPoint);
+        }
+
+        private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
+            TsidBuilder dataPointGroupTsidBuilder = DataPointTsidFunnel.forDataPoint(byteStringAccessor, dataPoint);
+            Hash128 dataPointGroupHash = dataPointGroupTsidBuilder.hash();
+            // in addition to the fields that go into the _tsid, we also need to group by timestamp and start timestamp
+            Hash128 timestamp = new Hash128(dataPoint.getTimestampUnixNano(), dataPoint.getStartTimestampUnixNano());
+            // TODO determine based on attributes and scope name
+            String targetIndex = "metrics-generic.otel-default";
+            var dataPointGroupsByTimestamp = dataPointGroupsByIndexAndTimestamp.computeIfAbsent(targetIndex, k -> new HashMap<>());
+            var dataPointGroups = dataPointGroupsByTimestamp.computeIfAbsent(timestamp, k -> new HashMap<>());
+            DataPointGroup dataPointGroup = dataPointGroups.get(dataPointGroupHash);
+            if (dataPointGroup == null) {
+                dataPointGroup = new DataPointGroup(
+                    resourceGroup.resource,
+                    resourceGroup.resourceSchemaUrl,
+                    scope,
+                    scopeSchemaUrl,
+                    dataPoint.getAttributes(),
+                    dataPoint.getUnit(),
+                    new ArrayList<>(),
+                    targetIndex
+                );
+                dataPointGroups.put(dataPointGroupHash, dataPointGroup);
+            }
+            return dataPointGroup;
+        }
+
+        public <E extends Exception> void forEach(CheckedConsumer<DataPointGroup, E> consumer) throws E {
+            for (var dataPointGroupsByTime : dataPointGroupsByIndexAndTimestamp.values()) {
+                for (var dataPointGroups : dataPointGroupsByTime.values()) {
+                    for (DataPointGroup dataPointGroup : dataPointGroups.values()) {
+                        consumer.accept(dataPointGroup);
+                    }
+                }
+            }
+        }
+    }
+
+    public static final class DataPointGroup {
+        private final Resource resource;
+        private final ByteString resourceSchemaUrl;
+        private final InstrumentationScope scope;
+        private final ByteString scopeSchemaUrl;
+        private final List<KeyValue> dataPointAttributes;
+        private final String unit;
+        private final List<DataPoint> dataPoints;
+        private final String targetIndex;
+        private String metricNamesHash;
+
+        public DataPointGroup(
+            Resource resource,
+            ByteString resourceSchemaUrl,
+            InstrumentationScope scope,
+            ByteString scopeSchemaUrl,
+            List<KeyValue> dataPointAttributes,
+            String unit,
+            List<DataPoint> dataPoints,
+            String targetIndex
+        ) {
+            this.resource = resource;
+            this.resourceSchemaUrl = resourceSchemaUrl;
+            this.scope = scope;
+            this.scopeSchemaUrl = scopeSchemaUrl;
+            this.dataPointAttributes = dataPointAttributes;
+            this.unit = unit;
+            this.dataPoints = dataPoints;
+            this.targetIndex = targetIndex;
+        }
+
+        public long getTimestampUnixNano() {
+            return dataPoints.getFirst().getTimestampUnixNano();
+        }
+
+        public long getStartTimestampUnixNano() {
+            return dataPoints.getFirst().getStartTimestampUnixNano();
+        }
+
+        public String getMetricNamesHash() {
+            if (metricNamesHash == null) {
+                BufferedMurmur3Hasher hasher = new BufferedMurmur3Hasher(0);
+                for (int i = 0; i < dataPoints.size(); i++) {
+                    hasher.addString(dataPoints.get(i).getMetricName());
+                }
+                metricNamesHash = Integer.toHexString(hasher.digestHash().hashCode());
+            }
+            return metricNamesHash;
+        }
+
+        public void addDataPoint(DataPoint dataPoint) {
+            metricNamesHash = null; // reset the hash when adding a new data point
+            dataPoints.add(dataPoint);
+        }
+
+        public Resource resource() {
+            return resource;
+        }
+
+        public ByteString resourceSchemaUrl() {
+            return resourceSchemaUrl;
+        }
+
+        public InstrumentationScope scope() {
+            return scope;
+        }
+
+        public ByteString scopeSchemaUrl() {
+            return scopeSchemaUrl;
+        }
+
+        public List<KeyValue> dataPointAttributes() {
+            return dataPointAttributes;
+        }
+
+        public String unit() {
+            return unit;
+        }
+
+        public List<DataPoint> dataPoints() {
+            return dataPoints;
+        }
+
+        public String targetIndex() {
+            return targetIndex;
+        }
+    }
+}

+ 67 - 0
x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessor.java

@@ -0,0 +1,67 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.oteldata.otlp.proto;
+
+import com.google.protobuf.ByteString;
+
+import org.elasticsearch.cluster.routing.TsidBuilder;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+
+/**
+ * A utility class that uses a shared {@code byte[]} buffer to convert {@link ByteString} values to byte arrays.
+ * This avoids frequent allocations of byte arrays in {@link ByteString#toByteArray()}.
+ * Note that due to the use of a shared buffer, this class is not thread-safe.
+ */
+public class BufferedByteStringAccessor {
+
+    private static final int DEFAULT_INITIAL_SIZE = 128;
+
+    private byte[] bytes = new byte[DEFAULT_INITIAL_SIZE];
+
+    /**
+     * Adds a string dimension to the given {@link TsidBuilder} using the provided dimension name and {@link ByteString} value.
+     * The value is converted to a byte array using a shared buffer and added to the builder.
+     *
+     * @param tsidBuilder the builder to which the dimension will be added
+     * @param dimension   the name of the dimension to add
+     * @param value       the value of the dimension as a {@link ByteString}
+     */
+    public void addStringDimension(TsidBuilder tsidBuilder, String dimension, ByteString value) {
+        if (value.isEmpty()) {
+            // Ignoring invalid values
+            // According to the spec https://opentelemetry.io/docs/specs/otel/common/#attribute:
+            // The attribute key MUST be a non-null and non-empty string.
+            return;
+        }
+        tsidBuilder.addStringDimension(dimension, toBytes(value), 0, value.size());
+    }
+
+    /**
+     * Writes a UTF-8 encoded value to the provided {@link XContentBuilder} using {@link XContentBuilder#utf8Value}.
+     * This uses a shared byte array to avoid allocations.
+     *
+     * @param value the value to write
+     */
+    public void utf8Value(XContentBuilder builder, ByteString value) throws IOException {
+        builder.utf8Value(toBytes(value), 0, value.size());
+    }
+
+    /*
+     * Not exposed as a public method to avoid risks of leaking a reference to the reused byte array.
+     */
+    private byte[] toBytes(ByteString byteString) {
+        int size = byteString.size();
+        if (bytes.length < size) {
+            bytes = new byte[size];
+        }
+        byteString.copyTo(bytes, 0);
+        return bytes;
+    }
+}

+ 67 - 0
x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnel.java

@@ -0,0 +1,67 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.oteldata.otlp.tsid;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+
+import org.elasticsearch.cluster.routing.TsidBuilder;
+import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel;
+import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
+
+import java.util.List;
+
+class AttributeListTsidFunnel implements TsidFunnel<List<KeyValue>> {
+
+    private final String prefix;
+    private final BufferedByteStringAccessor byteStringAccessor;
+
+    private AttributeListTsidFunnel(BufferedByteStringAccessor byteStringAccessor, String prefix) {
+        this.prefix = prefix;
+        this.byteStringAccessor = byteStringAccessor;
+    }
+
+    static AttributeListTsidFunnel get(BufferedByteStringAccessor byteStringAccessor, String prefix) {
+        return new AttributeListTsidFunnel(byteStringAccessor, prefix);
+    }
+
+    @Override
+    public void add(List<KeyValue> attributesList, TsidBuilder tsidBuilder) {
+        for (int i = 0; i < attributesList.size(); i++) {
+            KeyValue keyValue = attributesList.get(i);
+            String attributeKey = keyValue.getKey();
+            hashValue(tsidBuilder, prefix + attributeKey, keyValue.getValue());
+        }
+    }
+
+    private void hashValue(TsidBuilder tsidBuilder, String key, AnyValue value) {
+        switch (value.getValueCase()) {
+            case STRING_VALUE:
+                byteStringAccessor.addStringDimension(tsidBuilder, key, value.getStringValueBytes());
+                break;
+            case BOOL_VALUE:
+                tsidBuilder.addBooleanDimension(key, value.getBoolValue());
+                break;
+            case DOUBLE_VALUE:
+                tsidBuilder.addDoubleDimension(key, value.getDoubleValue());
+                break;
+            case INT_VALUE:
+                tsidBuilder.addLongDimension(key, value.getIntValue());
+                break;
+            case KVLIST_VALUE:
+                tsidBuilder.add(value.getKvlistValue().getValuesList(), AttributeListTsidFunnel.get(byteStringAccessor, key + "."));
+                break;
+            case ARRAY_VALUE:
+                List<AnyValue> valuesList = value.getArrayValue().getValuesList();
+                for (int i = 0; i < valuesList.size(); i++) {
+                    hashValue(tsidBuilder, key, valuesList.get(i));
+                }
+                break;
+        }
+    }
+}

+ 34 - 0
x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/DataPointTsidFunnel.java

@@ -0,0 +1,34 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.oteldata.otlp.tsid;
+
+import org.elasticsearch.cluster.routing.TsidBuilder;
+import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel;
+import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPoint;
+import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
+
+public class DataPointTsidFunnel implements TsidFunnel<DataPoint> {
+
+    private final BufferedByteStringAccessor byteStringAccessor;
+
+    private DataPointTsidFunnel(BufferedByteStringAccessor byteStringAccessor) {
+        this.byteStringAccessor = byteStringAccessor;
+    }
+
+    public static TsidBuilder forDataPoint(BufferedByteStringAccessor byteStringAccessor, DataPoint dataPoint) {
+        TsidBuilder tsidBuilder = new TsidBuilder(dataPoint.getAttributes().size());
+        new DataPointTsidFunnel(byteStringAccessor).add(dataPoint, tsidBuilder);
+        return tsidBuilder;
+    }
+
+    @Override
+    public void add(DataPoint dataPoint, TsidBuilder tsidBuilder) {
+        tsidBuilder.add(dataPoint.getAttributes(), AttributeListTsidFunnel.get(byteStringAccessor, "attributes."));
+        tsidBuilder.addStringDimension("unit", dataPoint.getUnit());
+    }
+}

+ 39 - 0
x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ResourceTsidFunnel.java

@@ -0,0 +1,39 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.oteldata.otlp.tsid;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+
+import org.elasticsearch.cluster.routing.TsidBuilder;
+import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel;
+import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
+
+import java.util.List;
+
+public class ResourceTsidFunnel implements TsidFunnel<ResourceMetrics> {
+
+    private final BufferedByteStringAccessor byteStringAccessor;
+
+    public ResourceTsidFunnel(BufferedByteStringAccessor byteStringAccessor) {
+        this.byteStringAccessor = byteStringAccessor;
+    }
+
+    public static TsidBuilder forResource(BufferedByteStringAccessor byteStringAccessor, ResourceMetrics resourceMetrics) {
+        TsidBuilder tsidBuilder = new TsidBuilder(resourceMetrics.getResource().getAttributesCount() + 1);
+        new ResourceTsidFunnel(byteStringAccessor).add(resourceMetrics, tsidBuilder);
+        return tsidBuilder;
+    }
+
+    @Override
+    public void add(ResourceMetrics resourceMetrics, TsidBuilder tsidBuilder) {
+        List<KeyValue> resourceAttributes = resourceMetrics.getResource().getAttributesList();
+        tsidBuilder.add(resourceAttributes, AttributeListTsidFunnel.get(byteStringAccessor, "resource.attributes."));
+        byteStringAccessor.addStringDimension(tsidBuilder, "schema_url", resourceMetrics.getSchemaUrlBytes());
+    }
+}

+ 39 - 0
x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/tsid/ScopeTsidFunnel.java

@@ -0,0 +1,39 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.oteldata.otlp.tsid;
+
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.elasticsearch.cluster.routing.TsidBuilder;
+import org.elasticsearch.cluster.routing.TsidBuilder.TsidFunnel;
+import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
+
+import java.util.List;
+
+public class ScopeTsidFunnel implements TsidFunnel<ScopeMetrics> {
+
+    private final BufferedByteStringAccessor byteStringAccessor;
+
+    public ScopeTsidFunnel(BufferedByteStringAccessor byteStringAccessor) {
+        this.byteStringAccessor = byteStringAccessor;
+    }
+
+    public static TsidBuilder forScope(BufferedByteStringAccessor byteStringAccessor, ScopeMetrics scopeMetrics) {
+        TsidBuilder tsidBuilder = new TsidBuilder(scopeMetrics.getScope().getAttributesCount() + 3);
+        new ScopeTsidFunnel(byteStringAccessor).add(scopeMetrics, tsidBuilder);
+        return tsidBuilder;
+    }
+
+    @Override
+    public void add(ScopeMetrics scopeMetrics, TsidBuilder tsidBuilder) {
+        List<KeyValue> resourceAttributes = scopeMetrics.getScope().getAttributesList();
+        byteStringAccessor.addStringDimension(tsidBuilder, "scope.name", scopeMetrics.getScope().getNameBytes());
+        tsidBuilder.add(resourceAttributes, AttributeListTsidFunnel.get(byteStringAccessor, "scope.attributes."));
+    }
+}

+ 140 - 0
x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OtlpUtils.java

@@ -0,0 +1,140 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+package org.elasticsearch.xpack.oteldata.otlp;
+
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.ArrayValue;
+import io.opentelemetry.proto.common.v1.InstrumentationScope;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.common.v1.KeyValueList;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Gauge;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+import io.opentelemetry.proto.metrics.v1.Sum;
+import io.opentelemetry.proto.resource.v1.Resource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.elasticsearch.test.ESTestCase.randomDouble;
+import static org.elasticsearch.test.ESTestCase.randomLong;
+
+public class OtlpUtils {
+
+    public static KeyValueList keyValueList(KeyValue... values) {
+        return KeyValueList.newBuilder().addAllValues(List.of(values)).build();
+    }
+
+    public static KeyValue keyValue(String key, String value) {
+        return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setStringValue(value).build()).build();
+    }
+
+    public static KeyValue keyValue(String key, String... values) {
+        return KeyValue.newBuilder()
+            .setKey(key)
+            .setValue(
+                AnyValue.newBuilder()
+                    .setArrayValue(
+                        ArrayValue.newBuilder()
+                            .addAllValues(Arrays.stream(values).map(v -> AnyValue.newBuilder().setStringValue(v).build()).toList())
+                            .build()
+                    )
+                    .build()
+            )
+            .build();
+    }
+
+    public static KeyValue keyValue(String key, long value) {
+        return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setIntValue(value).build()).build();
+    }
+
+    public static KeyValue keyValue(String key, double value) {
+        return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setDoubleValue(value).build()).build();
+    }
+
+    public static KeyValue keyValue(String key, boolean value) {
+        return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setBoolValue(value).build()).build();
+    }
+
+    public static KeyValue keyValue(String key, KeyValueList keyValueList) {
+        return KeyValue.newBuilder().setKey(key).setValue(AnyValue.newBuilder().setKvlistValue(keyValueList).build()).build();
+    }
+
+    private static Resource createResource(List<KeyValue> attributes) {
+        return Resource.newBuilder().addAllAttributes(attributes).build();
+    }
+
+    public static ResourceMetrics createResourceMetrics(List<KeyValue> attributes, List<ScopeMetrics> scopeMetrics) {
+        return ResourceMetrics.newBuilder().setResource(createResource(attributes)).addAllScopeMetrics(scopeMetrics).build();
+    }
+
+    private static InstrumentationScope createScope(String name, String version) {
+        return InstrumentationScope.newBuilder().setName(name).setVersion(version).build();
+    }
+
+    public static ScopeMetrics createScopeMetrics(String name, String version, Iterable<Metric> metrics) {
+        return ScopeMetrics.newBuilder().setScope(createScope(name, version)).addAllMetrics(metrics).build();
+    }
+
+    public static Metric createGaugeMetric(String name, String unit, List<NumberDataPoint> dataPoints) {
+        return Metric.newBuilder().setName(name).setUnit(unit).setGauge(Gauge.newBuilder().addAllDataPoints(dataPoints).build()).build();
+    }
+
+    public static Metric createSumMetric(
+        String name,
+        String unit,
+        List<NumberDataPoint> dataPoints,
+        boolean isMonotonic,
+        AggregationTemporality temporality
+    ) {
+        return Metric.newBuilder()
+            .setName(name)
+            .setUnit(unit)
+            .setSum(
+                Sum.newBuilder().addAllDataPoints(dataPoints).setIsMonotonic(isMonotonic).setAggregationTemporality(temporality).build()
+            )
+            .build();
+    }
+
+    public static NumberDataPoint createDoubleDataPoint(long timestamp, List<KeyValue> attributes) {
+        return NumberDataPoint.newBuilder()
+            .setTimeUnixNano(timestamp)
+            .setStartTimeUnixNano(timestamp)
+            .addAllAttributes(attributes)
+            .setAsDouble(randomDouble())
+            .build();
+    }
+
+    public static NumberDataPoint createLongDataPoint(long timestamp, List<KeyValue> attributes) {
+        return NumberDataPoint.newBuilder()
+            .setTimeUnixNano(timestamp)
+            .setStartTimeUnixNano(timestamp)
+            .addAllAttributes(attributes)
+            .setAsInt(randomLong())
+            .build();
+    }
+
+    public static ExportMetricsServiceRequest createMetricsRequest(List<Metric> metrics) {
+
+        List<ResourceMetrics> resourceMetrics = new ArrayList<>();
+        for (Metric metric : metrics) {
+            resourceMetrics.add(
+                createResourceMetrics(
+                    List.of(keyValue("service.name", "test-service")),
+                    List.of(createScopeMetrics("test", "1.0.0", List.of(metric)))
+                )
+            );
+        }
+
+        return ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(resourceMetrics).build();
+    }
+}

+ 177 - 0
x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java

@@ -0,0 +1,177 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.oteldata.otlp.datapoint;
+
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createDoubleDataPoint;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createGaugeMetric;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createLongDataPoint;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createMetricsRequest;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createResourceMetrics;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createScopeMetrics;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue;
+
+public class DataPointGroupingContextTests extends ESTestCase {
+
+    private final DataPointGroupingContext context = new DataPointGroupingContext(new BufferedByteStringAccessor());
+    private final long nowUnixNanos = System.currentTimeMillis() * 1_000_000L;
+
+    public void testGroupingSameGroup() throws Exception {
+        // Group data points
+        ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
+            List.of(
+                createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))),
+                createGaugeMetric("system.memory.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))),
+                createSumMetric(
+                    "http.requests.count",
+                    "",
+                    List.of(createLongDataPoint(nowUnixNanos, List.of())),
+                    true,
+                    AGGREGATION_TEMPORALITY_CUMULATIVE
+                )
+            )
+        );
+        context.groupDataPoints(metricsRequest);
+        assertEquals(3, context.totalDataPoints());
+        assertEquals(0, context.getIgnoredDataPoints());
+        assertEquals("", context.getIgnoredDataPointsMessage());
+
+        AtomicInteger groupCount = new AtomicInteger(0);
+        context.consume(dataPointGroup -> groupCount.incrementAndGet());
+        assertEquals(1, groupCount.get());
+    }
+
+    public void testGroupingDifferentGroupUnit() throws Exception {
+        // Group data points
+        ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
+            List.of(
+                createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))),
+                createGaugeMetric("system.memory.usage", "By", List.of(createLongDataPoint(nowUnixNanos, List.of())))
+            )
+        );
+        context.groupDataPoints(metricsRequest);
+        assertEquals(2, context.totalDataPoints());
+        assertEquals(0, context.getIgnoredDataPoints());
+        assertEquals("", context.getIgnoredDataPointsMessage());
+
+        AtomicInteger groupCount = new AtomicInteger(0);
+        context.consume(dataPointGroup -> groupCount.incrementAndGet());
+        assertEquals(2, groupCount.get());
+    }
+
+    public void testGroupingDifferentResource() throws Exception {
+        ResourceMetrics resource1 = createResourceMetrics(
+            List.of(keyValue("service.name", "test-service_1")),
+            List.of(
+                createScopeMetrics(
+                    "test",
+                    "1.0.0",
+                    List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))))
+                )
+            )
+        );
+        ResourceMetrics resource2 = createResourceMetrics(
+            List.of(keyValue("service.name", "test-service_2")),
+            List.of(
+                createScopeMetrics(
+                    "test",
+                    "1.0.0",
+                    List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of()))))
+                )
+            )
+        );
+
+        context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build());
+        assertEquals(2, context.totalDataPoints());
+        assertEquals(0, context.getIgnoredDataPoints());
+        assertEquals("", context.getIgnoredDataPointsMessage());
+
+        AtomicInteger groupCount = new AtomicInteger(0);
+        context.consume(dataPointGroup -> groupCount.incrementAndGet());
+        assertEquals(2, groupCount.get());
+    }
+
+    public void testGroupingDifferentScope() throws Exception {
+        ResourceMetrics resource1 = createResourceMetrics(
+            List.of(keyValue("service.name", "test-service")),
+            List.of(
+                createScopeMetrics(
+                    "test_scope_1",
+                    "1.0.0",
+                    List.of(createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))))
+                )
+            )
+        );
+        ResourceMetrics resource2 = createResourceMetrics(
+            List.of(keyValue("service.name", "test-service")),
+            List.of(
+                createScopeMetrics(
+                    "test_scope_2",
+                    "1.0.0",
+                    List.of(createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of()))))
+                )
+            )
+        );
+
+        context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build());
+        assertEquals(2, context.totalDataPoints());
+        assertEquals(0, context.getIgnoredDataPoints());
+        assertEquals("", context.getIgnoredDataPointsMessage());
+
+        AtomicInteger groupCount = new AtomicInteger(0);
+        context.consume(dataPointGroup -> groupCount.incrementAndGet());
+        assertEquals(2, groupCount.get());
+    }
+
+    public void testGroupingDifferentGroupTimestamp() throws Exception {
+        // Group data points
+        ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
+            List.of(
+                createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos + 1, List.of()))),
+                createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of())))
+            )
+        );
+        context.groupDataPoints(metricsRequest);
+        assertEquals(2, context.totalDataPoints());
+        assertEquals(0, context.getIgnoredDataPoints());
+        assertEquals("", context.getIgnoredDataPointsMessage());
+
+        AtomicInteger groupCount = new AtomicInteger(0);
+        context.consume(dataPointGroup -> groupCount.incrementAndGet());
+        assertEquals(2, groupCount.get());
+    }
+
+    public void testGroupingDifferentGroupAttributes() throws Exception {
+        // Group data points
+        ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
+            List.of(
+                createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos, List.of(keyValue("core", "cpu0"))))),
+                createGaugeMetric("system.memory.usage", "", List.of(createLongDataPoint(nowUnixNanos, List.of())))
+            )
+        );
+        context.groupDataPoints(metricsRequest);
+        assertEquals(2, context.totalDataPoints());
+        assertEquals(0, context.getIgnoredDataPoints());
+        assertEquals("", context.getIgnoredDataPointsMessage());
+
+        AtomicInteger groupCount = new AtomicInteger(0);
+        context.consume(dataPointGroup -> groupCount.incrementAndGet());
+        assertEquals(2, groupCount.get());
+    }
+
+}

+ 75 - 0
x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointNumberTests.java

@@ -0,0 +1,75 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.oteldata.otlp.datapoint;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+
+import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
+import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createDoubleDataPoint;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createGaugeMetric;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createLongDataPoint;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric;
+import static org.hamcrest.Matchers.equalTo;
+
+public class DataPointNumberTests extends ESTestCase {
+
+    private final long nowUnixNanos = System.currentTimeMillis() * 1_000_000L;
+
+    public void testGauge() {
+        DataPoint.Number doubleGauge = new DataPoint.Number(
+            createDoubleDataPoint(nowUnixNanos, List.of()),
+            createGaugeMetric("system.cpu.usage", "", List.of())
+        );
+        assertThat(doubleGauge.getDynamicTemplate(), equalTo("gauge_double"));
+        DataPoint.Number longGauge = new DataPoint.Number(
+            createLongDataPoint(nowUnixNanos, List.of()),
+            createGaugeMetric("system.cpu.usage", "", List.of())
+        );
+        assertThat(longGauge.getDynamicTemplate(), equalTo("gauge_long"));
+    }
+
+    public void testCounterTemporality() {
+        DataPoint.Number doubleCumulative = new DataPoint.Number(
+            createDoubleDataPoint(nowUnixNanos, List.of()),
+            createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE)
+        );
+        assertThat(doubleCumulative.getDynamicTemplate(), equalTo("counter_double"));
+        DataPoint.Number longCumulative = new DataPoint.Number(
+            createLongDataPoint(nowUnixNanos, List.of()),
+            createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_CUMULATIVE)
+        );
+        assertThat(longCumulative.getDynamicTemplate(), equalTo("counter_long"));
+        DataPoint.Number doubleDelta = new DataPoint.Number(
+            createDoubleDataPoint(nowUnixNanos, List.of()),
+            createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA)
+        );
+        assertThat(doubleDelta.getDynamicTemplate(), equalTo("gauge_double"));
+        DataPoint.Number longDelta = new DataPoint.Number(
+            createLongDataPoint(nowUnixNanos, List.of()),
+            createSumMetric("http.requests.count", "", List.of(), true, AGGREGATION_TEMPORALITY_DELTA)
+        );
+        assertThat(longDelta.getDynamicTemplate(), equalTo("gauge_long"));
+    }
+
+    public void testCounterNonMonotonic() {
+        DataPoint.Number doubleNonMonotonic = new DataPoint.Number(
+            createDoubleDataPoint(nowUnixNanos, List.of()),
+            createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_CUMULATIVE)
+        );
+        assertThat(doubleNonMonotonic.getDynamicTemplate(), equalTo("gauge_double"));
+        DataPoint.Number longNonMonotonic = new DataPoint.Number(
+            createLongDataPoint(nowUnixNanos, List.of()),
+            createSumMetric("http.requests.count", "", List.of(), false, AGGREGATION_TEMPORALITY_DELTA)
+        );
+        assertThat(longNonMonotonic.getDynamicTemplate(), equalTo("gauge_long"));
+    }
+
+}

+ 54 - 0
x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/proto/BufferedByteStringAccessorTests.java

@@ -0,0 +1,54 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.oteldata.otlp.proto;
+
+import com.google.protobuf.ByteString;
+
+import org.elasticsearch.cluster.routing.TsidBuilder;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.json.JsonXContent;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class BufferedByteStringAccessorTests extends ESTestCase {
+
+    private final BufferedByteStringAccessor accessor = new BufferedByteStringAccessor();
+
+    public void testAddStringDimension() {
+        String value = randomUnicodeOfLengthBetween(1, 1000);
+        TsidBuilder byteStringTsidBuilder = new TsidBuilder();
+        accessor.addStringDimension(byteStringTsidBuilder, "test", ByteString.copyFromUtf8(value));
+        TsidBuilder basicTsidBuilder = new TsidBuilder();
+        basicTsidBuilder.addStringDimension("test", value);
+        assertThat(byteStringTsidBuilder.hash(), equalTo(basicTsidBuilder.hash()));
+    }
+
+    public void testAddEmptyStringDimension() {
+        TsidBuilder byteStringTsidBuilder = new TsidBuilder();
+        accessor.addStringDimension(byteStringTsidBuilder, "test", ByteString.copyFromUtf8(""));
+        assertThat(byteStringTsidBuilder.size(), equalTo(0));
+    }
+
+    public void testUtf8Value() throws Exception {
+        String value = randomUnicodeOfLengthBetween(0, 1000);
+        String json;
+        try (XContentBuilder builder = JsonXContent.contentBuilder()) {
+            builder.startObject();
+            builder.field("value");
+            accessor.utf8Value(builder, ByteString.copyFromUtf8(value));
+            builder.endObject();
+            json = Strings.toString(builder);
+        }
+
+        assertThat(XContentHelper.convertToMap(JsonXContent.jsonXContent, json, false).get("value"), equalTo(value));
+    }
+
+}

+ 59 - 0
x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/tsid/AttributeListTsidFunnelTests.java

@@ -0,0 +1,59 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.oteldata.otlp.tsid;
+
+import org.elasticsearch.cluster.routing.TsidBuilder;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;
+
+import java.util.List;
+
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue;
+import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValueList;
+
+public class AttributeListTsidFunnelTests extends ESTestCase {
+
+    private final TsidBuilder plainBuilder = new TsidBuilder();
+    private final TsidBuilder funnelBuilder = new TsidBuilder();
+    private final BufferedByteStringAccessor byteStringAccessor = new BufferedByteStringAccessor();
+    private final AttributeListTsidFunnel funnel = AttributeListTsidFunnel.get(byteStringAccessor, "attributes.");
+
+    public void testSimpleAttributes() {
+        funnel.add(
+            List.of(
+                keyValue("string", "value"),
+                keyValue("array", "value1", "value2"),
+                keyValue("bool", true),
+                keyValue("double", 1.5),
+                keyValue("long", 42L),
+                keyValue("int", 42)
+            ),
+            funnelBuilder
+        );
+        plainBuilder.addStringDimension("attributes.string", "value");
+        plainBuilder.addStringDimension("attributes.array", "value1");
+        plainBuilder.addStringDimension("attributes.array", "value2");
+        plainBuilder.addBooleanDimension("attributes.bool", true);
+        plainBuilder.addDoubleDimension("attributes.double", 1.5);
+        plainBuilder.addLongDimension("attributes.long", 42);
+        plainBuilder.addIntDimension("attributes.int", 42);
+        assertEquals(plainBuilder.hash(), funnelBuilder.hash());
+    }
+
+    public void testNestedAttributes() {
+        funnel.add(
+            List.of(keyValue("foo", "bar"), keyValue("nested", keyValueList(keyValue("string", "value"), keyValue("int", 42)))),
+            funnelBuilder
+        );
+        plainBuilder.addStringDimension("attributes.nested.string", "value");
+        plainBuilder.addLongDimension("attributes.nested.int", 42);
+        plainBuilder.addStringDimension("attributes.foo", "bar");
+        assertEquals(plainBuilder.hash(), funnelBuilder.hash());
+    }
+
+}