Browse Source

Adding `NormalizeForStreamProcessor` (#125699)

eyalkoren 4 months ago
parent
commit
d3d2d9b996

+ 5 - 0
docs/changelog/125699.yaml

@@ -0,0 +1,5 @@
+pr: 125699
+summary: Adding `NormalizeForStreamProcessor`
+area: Ingest Node
+type: feature
+issues: []

+ 3 - 0
docs/reference/enrich-processor/index.md

@@ -84,6 +84,9 @@ Refer to [Enrich your data](docs-content://manage-data/ingest/transform-enrich/d
 [`network_direction` processor](/reference/enrich-processor/network-direction-processor.md)
 :   Calculates the network direction given a source IP address, destination IP address, and a list of internal networks.
 
+[`normalize_for_stream` processor](/reference/enrich-processor/normalize-for-stream.md)
+:   Normalizes non-OpenTelemetry documents to be OpenTelemetry-compliant.
+
 [`registered_domain` processor](/reference/enrich-processor/registered-domain-processor.md)
 :   Extracts the registered domain (also known as the effective top-level domain or eTLD), sub-domain, and top-level domain from a fully qualified domain name (FQDN).
 

+ 152 - 0
docs/reference/enrich-processor/normalize-for-stream.md

@@ -0,0 +1,152 @@
+---
+navigation_title: "Normalize for Stream"
+mapped_pages:
+  - https://www.elastic.co/guide/en/elasticsearch/reference/current/normalize-for-stream-processor.html
+---
+
+# Normalize-for-Stream processor [normalize-for-stream-processor]
+
+
+Detects whether a document is OpenTelemetry-compliant and if not -
+normalizes it as described below. If used in combination with the OTel-related
+mappings such as the ones defined in `logs-otel@template`, the resulting
+document can be queried seamlessly by clients that expect either [ECS](https://www.elastic.co/guide/en/ecs/current/index.html) or OpenTelemetry-[Semantic-Conventions](https://github.com/open-telemetry/semantic-conventions) formats.
+
+::::{note}
+This processor is in tech preview and is not available in our serverless offering.
+::::
+
+## Detecting OpenTelemetry compliance
+
+The processor detects OpenTelemetry compliance by checking the following fields:
+* `resource` exists as a key and the value is a map
+* `resource` either doesn't contain an `attributes` field, or contains an `attributes` field of type map
+* `scope` is either missing or a map
+* `attributes` is either missing or a map
+* `body` is either missing or a map
+* `body` either doesn't contain a `text` field, or contains a `text` field of type `String`
+* `body` either doesn't contain a `structured` field, or contains a `structured` field that is not of type `String`
+
+If all of these conditions are met, the document is considered OpenTelemetry-compliant and is not modified by the processor.
+
+## Normalization
+
+If the document is not OpenTelemetry-compliant, the processor normalizes it as follows:
+* Specific ECS fields are renamed to have their corresponding OpenTelemetry Semantic Conventions attribute names. These include the following:
+
+  | ECS Field   | Semantic Conventions Attribute |
+  |-------------|--------------------------------|
+  | `span.id`   | `span_id`                      |
+  | `trace.id`  | `trace_id`                     |
+  | `message`   | `body.text`                    |
+  | `log.level` | `severity_text`                |
+  The processor first looks for the nested form of the ECS field and if such does not exist, it looks for a top-level field with the dotted field name.
+* Other specific ECS fields that describe resources and have corresponding counterparts in the OpenTelemetry Semantic Conventions are moved to the `resource.attribtues` map. Fields that are considered resource attributes are such that conform to the following conditions:
+    * They are ECS fields that have corresponding counterparts (either with
+      the same name or with a different name) in OpenTelemetry Semantic Conventions.
+    * The corresponding OpenTelemetry attribute is defined in
+      [Semantic Conventions](https://github.com/open-telemetry/semantic-conventions/tree/main/model)
+      within a group that is defined as `type: enitity`.
+* All other fields, except for `@timestamp`, are moved to the `attributes` map.
+* All non-array entries of the `attributes` and `resource.attributes` maps are flattened. Flattening means that nested objects are merged into their parent object, and the keys are concatenated with a dot. See examples below.
+
+## Examples
+
+If an OpenTelemetry-compliant document is detected, the processor does nothing. For example, the following document will stay unchanged:
+
+```json
+{
+  "resource": {
+    "attributes": {
+      "service.name": "my-service"
+    }
+  },
+  "scope": {
+    "name": "my-library",
+    "version": "1.0.0"
+  },
+  "attributes": {
+    "http.method": "GET"
+  },
+  "body": {
+    "text": "Hello, world!"
+  }
+}
+```
+
+If a non-OpenTelemetry-compliant document is detected, the processor normalizes it. For example, the following document:
+
+```json
+{
+  "@timestamp": "2023-10-01T12:00:00Z",
+  "service": {
+    "name": "my-service",
+    "version": "1.0.0",
+    "environment": "production",
+    "language": {
+      "name": "python",
+      "version": "3.8"
+    }
+  },
+  "log": {
+    "level": "INFO"
+  },
+  "message": "Hello, world!",
+  "http": {
+    "method": "GET",
+    "url": {
+      "path": "/api/v1/resource"
+    },
+    "headers": [
+      {
+        "name": "Authorization",
+        "value": "Bearer token"
+      },
+      {
+        "name": "User-Agent",
+        "value": "my-client/1.0"
+      }
+    ]
+  },
+  "span" : {
+    "id": "1234567890abcdef"
+  },
+  "span.id": "abcdef1234567890",
+  "trace.id": "abcdef1234567890abcdef1234567890"
+}
+```
+will be normalized into the following form:
+
+```json
+{
+  "@timestamp": "2023-10-01T12:00:00Z",
+  "resource": {
+    "attributes": {
+      "service.name": "my-service",
+      "service.version": "1.0.0",
+      "service.environment": "production"
+    }
+  },
+  "attributes": {
+    "service.language.name": "python",
+    "service.language.version": "3.8",
+    "http.method": "GET",
+    "http.url.path": "/api/v1/resource",
+    "http.headers": [
+      {
+        "name": "Authorization",
+        "value": "Bearer token"
+      },
+      {
+        "name": "User-Agent",
+        "value": "my-client/1.0"
+      }
+    ]
+  },
+  "body": {
+    "text": "Hello, world!"
+  },
+  "span_id": "1234567890abcdef",
+  "trace_id": "abcdef1234567890abcdef1234567890"
+}
+```

+ 2 - 1
docs/reference/enrich-processor/toc.yml

@@ -28,6 +28,7 @@ toc:
   - file: kv-processor.md
   - file: lowercase-processor.md
   - file: network-direction-processor.md
+  - file: normalize-for-stream.md
   - file: pipeline-processor.md
   - file: redact-processor.md
   - file: registered-domain-processor.md
@@ -44,4 +45,4 @@ toc:
   - file: uppercase-processor.md
   - file: urldecode-processor.md
   - file: uri-parts-processor.md
-  - file: user-agent-processor.md
+  - file: user-agent-processor.md

+ 21 - 0
modules/ingest-otel/build.gradle

@@ -0,0 +1,21 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+apply plugin: 'elasticsearch.internal-yaml-rest-test'
+
+esplugin {
+  description = 'Ingest processor that normalizes ECS documents to OpenTelemetry-compatible namespaces'
+  classname ='org.elasticsearch.ingest.otel.NormalizeForStreamPlugin'
+}
+
+restResources {
+  restApi {
+    include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest'
+  }
+}

+ 13 - 0
modules/ingest-otel/src/main/java/module-info.java

@@ -0,0 +1,13 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+module org.elasticsearch.ingest.otel {
+    requires org.elasticsearch.base;
+    requires org.elasticsearch.server;
+}

+ 65 - 0
modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/EcsOTelResourceAttributes.java

@@ -0,0 +1,65 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import java.util.Set;
+
+final class EcsOTelResourceAttributes {
+
+    /**
+     * The set of ECS (Elastic Common Schema) field names that are mapped to OpenTelemetry resource attributes,
+     * as defined by the OpenTelemetry Semantic Conventions.
+     * The list is produced by the {@code ResourceAttributesTests#testAttributesSetUpToDate} test.
+     *
+     * @see <a href="https://github.com/open-telemetry/semantic-conventions">OpenTelemetry Semantic Conventions</a>
+     */
+    static final Set<String> LATEST = Set.of(
+        "agent.type",
+        "agent.build.original",
+        "agent.name",
+        "agent.id",
+        "agent.ephemeral_id",
+        "agent.version",
+        "container.image.tag",
+        "device.model.identifier",
+        "container.image.hash.all",
+        "service.node.name",
+        "process.pid",
+        "device.id",
+        "host.mac",
+        "host.type",
+        "container.id",
+        "cloud.availability_zone",
+        "host.ip",
+        "container.name",
+        "container.image.name",
+        "device.model.name",
+        "host.name",
+        "host.id",
+        "process.executable",
+        "user_agent.original",
+        "service.environment",
+        "cloud.region",
+        "service.name",
+        "faas.name",
+        "device.manufacturer",
+        "process.args",
+        "host.architecture",
+        "cloud.provider",
+        "container.runtime",
+        "service.version",
+        "cloud.service.name",
+        "cloud.account.id",
+        "process.command_line",
+        "faas.version"
+    );
+
+    private EcsOTelResourceAttributes() {}
+}

+ 24 - 0
modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamPlugin.java

@@ -0,0 +1,24 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.plugins.IngestPlugin;
+import org.elasticsearch.plugins.Plugin;
+
+import java.util.Map;
+
+public class NormalizeForStreamPlugin extends Plugin implements IngestPlugin {
+
+    @Override
+    public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
+        return Map.of(NormalizeForStreamProcessor.TYPE, new NormalizeForStreamProcessor.Factory());
+    }
+}

+ 276 - 0
modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java

@@ -0,0 +1,276 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import org.elasticsearch.cluster.metadata.ProjectId;
+import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.ingest.AbstractProcessor;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.Map.entry;
+
+/**
+ * This processor is responsible for transforming non-OpenTelemetry-compliant documents into a namespaced flavor of ECS
+ * that makes them compatible with OpenTelemetry.
+ * It DOES NOT translate the entire ECS schema into OpenTelemetry semantic conventions.
+ *
+ * <p>More specifically, this processor performs the following operations:
+ * <ul>
+ *   <li>Renames specific ECS fields to their corresponding OpenTelemetry-compatible counterparts.</li>
+ *   <li>Moves all other fields to the "attributes" namespace.</li>
+ *   <li>Flattens all attributes in the "attributes" namespace.</li>
+ *   <li>Moves resource fields from the "attributes" namespace to the "resource.attributes" namespace.</li>
+ * </ul>
+ *
+ * <p>If a document is identified as OpenTelemetry-compatible, no transformation is performed.
+ * @see org.elasticsearch.ingest.AbstractProcessor
+ */
+public class NormalizeForStreamProcessor extends AbstractProcessor {
+
+    public static final String TYPE = "normalize_for_stream";
+
+    /**
+     * Mapping of ECS field names to their corresponding OpenTelemetry-compatible counterparts.
+     */
+    private static final Map<String, String> RENAME_KEYS = Map.ofEntries(
+        entry("span.id", "span_id"),
+        entry("message", "body.text"),
+        entry("log.level", "severity_text"),
+        entry("trace.id", "trace_id")
+    );
+
+    /**
+     * A closed-set of keys that should be kept at the top level of the processed document after applying the namespacing.
+     * In essence, these are the fields that should not be moved to the "attributes" or "resource.attributes" namespaces.
+     * Besides the @timestamp field, this set obviously contains the attributes and the resource fields, as well as the
+     * OpenTelemetry-compatible fields that are renamed by the processor.
+     */
+    private static final Set<String> KEEP_KEYS;
+    static {
+        Set<String> keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource"));
+        Set<String> renamedTopLevelFields = new HashSet<>();
+        for (String value : RENAME_KEYS.values()) {
+            // if the renamed field is nested, we only need to know the top level field
+            int dotIndex = value.indexOf('.');
+            if (dotIndex != -1) {
+                renamedTopLevelFields.add(value.substring(0, dotIndex));
+            } else {
+                renamedTopLevelFields.add(value);
+            }
+        }
+        keepKeys.addAll(renamedTopLevelFields);
+        KEEP_KEYS = Set.copyOf(keepKeys);
+    }
+
+    private static final String ATTRIBUTES_KEY = "attributes";
+    private static final String RESOURCE_KEY = "resource";
+    private static final String SCOPE_KEY = "scope";
+    private static final String BODY_KEY = "body";
+    private static final String TEXT_KEY = "text";
+    private static final String STRUCTURED_KEY = "structured";
+
+    NormalizeForStreamProcessor(String tag, String description) {
+        super(tag, description);
+    }
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+    @Override
+    public IngestDocument execute(IngestDocument document) {
+        Map<String, Object> source = document.getSource();
+
+        boolean isOTel = isOTelDocument(source);
+        if (isOTel) {
+            return document;
+        }
+
+        // non-OTel document
+
+        Map<String, Object> newAttributes = new HashMap<>();
+        // The keep keys indicate the fields that should be kept at the top level later on when applying the namespacing.
+        // However, at this point we need to move their original values (if they exist) to the one of the new attributes namespaces, except
+        // for the @timestamp field. The assumption is that at this point the document is not OTel compliant, so even if a valid top
+        // level field is found, we assume that it does not bear the OTel semantics.
+        for (String keepKey : KEEP_KEYS) {
+            if (keepKey.equals("@timestamp")) {
+                continue;
+            }
+            if (source.containsKey(keepKey)) {
+                newAttributes.put(keepKey, source.remove(keepKey));
+            }
+        }
+
+        source.put(ATTRIBUTES_KEY, newAttributes);
+
+        renameSpecialKeys(document);
+
+        // move all top level keys except from specific ones to the "attributes" namespace
+        final var sourceItr = source.entrySet().iterator();
+        while (sourceItr.hasNext()) {
+            final var entry = sourceItr.next();
+            if (KEEP_KEYS.contains(entry.getKey()) == false) {
+                newAttributes.put(entry.getKey(), entry.getValue());
+                sourceItr.remove();
+            }
+        }
+
+        // Flatten attributes
+        Map<String, Object> flattenAttributes = Maps.flatten(newAttributes, false, false);
+        source.put(ATTRIBUTES_KEY, flattenAttributes);
+
+        Map<String, Object> newResource = new HashMap<>();
+        Map<String, Object> newResourceAttributes = new HashMap<>();
+        newResource.put(ATTRIBUTES_KEY, newResourceAttributes);
+        source.put(RESOURCE_KEY, newResource);
+        moveResourceAttributes(flattenAttributes, newResourceAttributes);
+
+        return document;
+    }
+
+    /**
+     * Checks if the given document is OpenTelemetry-compliant.
+     *
+     * <p>A document is considered OpenTelemetry-compliant if it meets the following criteria:
+     * <ul>
+     *   <li>The "resource" field is present and is a map
+     *   <li>The resource field either doesn't contain an "attributes" field, or the "attributes" field is a map.</li>
+     *   <li>The "scope" field is either absent or a map.</li>
+     *   <li>The "attributes" field is either absent or a map.</li>
+     *   <li>The "body" field is either absent or a map.</li>
+     *   <li>If exists, the "body" either doesn't contain a "text" field, or the "text" field is a string.</li>
+     *   <li>If exists, the "body" either doesn't contain a "structured" field, or the "structured" field is not a string.</li>
+     * </ul>
+     *
+     * @param source the document to check
+     * @return {@code true} if the document is OpenTelemetry-compliant, {@code false} otherwise
+     */
+    static boolean isOTelDocument(Map<String, Object> source) {
+        Object resource = source.get(RESOURCE_KEY);
+        if (resource instanceof Map<?, ?> resourceMap) {
+            Object resourceAttributes = resourceMap.get(ATTRIBUTES_KEY);
+            if (resourceAttributes != null && (resourceAttributes instanceof Map) == false) {
+                return false;
+            }
+        } else {
+            return false;
+        }
+
+        Object scope = source.get(SCOPE_KEY);
+        if (scope != null && scope instanceof Map == false) {
+            return false;
+        }
+
+        Object attributes = source.get(ATTRIBUTES_KEY);
+        if (attributes != null && attributes instanceof Map == false) {
+            return false;
+        }
+
+        Object body = source.get(BODY_KEY);
+        if (body != null) {
+            if (body instanceof Map<?, ?> bodyMap) {
+                Object bodyText = bodyMap.get(TEXT_KEY);
+                if (bodyText != null && (bodyText instanceof String) == false) {
+                    return false;
+                }
+                Object bodyStructured = bodyMap.get(STRUCTURED_KEY);
+                return (bodyStructured instanceof String) == false;
+            } else {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts, based on the {@code RENAME_KEYS} map.
+     *
+     * <p>This method performs the following operations:
+     * <ul>
+     *   <li>For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document. It first looks for the
+     *   field assuming dot notation for nested fields. If the field is not found, it looks for a top level field with a dotted name.</li>
+     *   <li>If the field exists, it removes it from the document and adds a new field with the corresponding name from the
+     *   {@code RENAME_KEYS} map and the same value.</li>
+     *   <li>If the key is nested (contains dots), it recursively removes empty parent fields after renaming.</li>
+     * </ul>
+     *
+     * @param document the document to process
+     */
+    static void renameSpecialKeys(IngestDocument document) {
+        RENAME_KEYS.forEach((nonOtelName, otelName) -> {
+            boolean fieldExists = false;
+            Object value = null;
+            // first look assuming dot notation for nested fields
+            if (document.hasField(nonOtelName)) {
+                fieldExists = true;
+                value = document.getFieldValue(nonOtelName, Object.class, true);
+                document.removeField(nonOtelName);
+                // recursively remove empty parent fields
+                int lastDot = nonOtelName.lastIndexOf('.');
+                while (lastDot > 0) {
+                    String parentName = nonOtelName.substring(0, lastDot);
+                    // parent should never be null and must be a map if we are here
+                    @SuppressWarnings("unchecked")
+                    Map<String, Object> parent = (Map<String, Object>) document.getFieldValue(parentName, Map.class);
+                    if (parent.isEmpty()) {
+                        document.removeField(parentName);
+                    } else {
+                        break;
+                    }
+                    lastDot = parentName.lastIndexOf('.');
+                }
+            } else if (nonOtelName.contains(".")) {
+                // look for dotted field names
+                Map<String, Object> source = document.getSource();
+                if (source.containsKey(nonOtelName)) {
+                    fieldExists = true;
+                    value = source.remove(nonOtelName);
+                }
+            }
+            if (fieldExists) {
+                document.setFieldValue(otelName, value);
+            }
+        });
+    }
+
+    private static void moveResourceAttributes(Map<String, Object> attributes, Map<String, Object> resourceAttributes) {
+        Set<String> ecsResourceFields = EcsOTelResourceAttributes.LATEST;
+        Iterator<Map.Entry<String, Object>> attributeIterator = attributes.entrySet().iterator();
+        while (attributeIterator.hasNext()) {
+            Map.Entry<String, Object> entry = attributeIterator.next();
+            if (ecsResourceFields.contains(entry.getKey())) {
+                resourceAttributes.put(entry.getKey(), entry.getValue());
+                attributeIterator.remove();
+            }
+        }
+    }
+
+    public static final class Factory implements Processor.Factory {
+        @Override
+        public Processor create(
+            Map<String, Processor.Factory> registry,
+            String tag,
+            String description,
+            Map<String, Object> config,
+            ProjectId projectId
+        ) {
+            return new NormalizeForStreamProcessor(tag, description);
+        }
+    }
+}

+ 100 - 0
modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/EcsFieldsDiscoverer.java

@@ -0,0 +1,100 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import org.elasticsearch.xcontent.XContentFactory;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xcontent.XContentType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+class EcsFieldsDiscoverer {
+
+    private static final String ECS_FLAT_FILE_URL = "https://raw.githubusercontent.com/elastic/ecs/main/generated/ecs/ecs_flat.yml";
+    private static final String AGENT_FIELDS_PREFIX = "agent.";
+
+    private static final EcsFieldsDiscoverer INSTANCE = new EcsFieldsDiscoverer();
+
+    private final Map<String, String> ecsToOTelAttributeNames = new HashMap<>();
+    private final Set<String> ecsResourceFields = new HashSet<>();
+
+    private EcsFieldsDiscoverer() {
+        try {
+            collectEcsAttributeNames();
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException("Failed to load ECS to OpenTelemetry attribute names", e);
+        }
+    }
+
+    Map<String, String> getEcsToOTelAttributeNames() {
+        return ecsToOTelAttributeNames;
+    }
+
+    Set<String> getEcsResourceFields() {
+        return ecsResourceFields;
+    }
+
+    static EcsFieldsDiscoverer getInstance() {
+        return INSTANCE;
+    }
+
+    private void collectEcsAttributeNames() throws IOException, InterruptedException {
+        Map<String, Object> ecsFields = loadEcsFields();
+        for (Map.Entry<String, Object> entry : ecsFields.entrySet()) {
+            String ecsName = entry.getKey();
+            @SuppressWarnings("unchecked")
+            Map<String, Object> fieldData = (Map<String, Object>) entry.getValue();
+            @SuppressWarnings("unchecked")
+            List<Map<String, String>> otelDataEntries = (List<Map<String, String>>) fieldData.get("otel");
+            if (otelDataEntries != null) {
+                for (Map<String, String> otelData : otelDataEntries) {
+                    String relation = otelData.get("relation");
+                    if ("match".equals(relation)) {
+                        ecsToOTelAttributeNames.put(ecsName, ecsName);
+                    } else if ("equivalent".equals(relation)) {
+                        String attribute = otelData.get("attribute");
+                        if (attribute != null) {
+                            ecsToOTelAttributeNames.put(ecsName, attribute);
+                        }
+                    }
+                }
+            }
+            if (ecsName.startsWith(AGENT_FIELDS_PREFIX)) {
+                // for now, we consider all agent.* fields as resource attributes, but this may change in the future
+                ecsResourceFields.add(ecsName);
+            }
+        }
+    }
+
+    private static Map<String, Object> loadEcsFields() throws IOException, InterruptedException {
+        try (HttpClient httpClient = HttpClient.newHttpClient()) {
+            HttpRequest request = HttpRequest.newBuilder().uri(URI.create(ECS_FLAT_FILE_URL)).build();
+            HttpResponse<InputStream> response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
+
+            try (
+                InputStream is = response.body();
+                XContentParser parser = XContentFactory.xContent(XContentType.YAML).createParser(XContentParserConfiguration.EMPTY, is)
+            ) {
+                return parser.map();
+            }
+        }
+    }
+}

+ 448 - 0
modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessorTests.java

@@ -0,0 +1,448 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Map.entry;
+
+public class NormalizeForStreamProcessorTests extends ESTestCase {
+
+    private final NormalizeForStreamProcessor processor = new NormalizeForStreamProcessor("test", "test processor");
+
+    public void testIsOTelDocument_validMinimalOTelDocument() {
+        Map<String, Object> source = new HashMap<>();
+        source.put("resource", new HashMap<>());
+        assertTrue(NormalizeForStreamProcessor.isOTelDocument(source));
+    }
+
+    public void testIsOTelDocument_validOTelDocumentWithScopeAndAttributes() {
+        Map<String, Object> source = new HashMap<>();
+        source.put("attributes", new HashMap<>());
+        source.put("resource", new HashMap<>());
+        source.put("scope", new HashMap<>());
+        assertTrue(NormalizeForStreamProcessor.isOTelDocument(source));
+    }
+
+    public void testIsOTelDocument_missingResource() {
+        Map<String, Object> source = new HashMap<>();
+        source.put("scope", new HashMap<>());
+        assertFalse(NormalizeForStreamProcessor.isOTelDocument(source));
+    }
+
+    public void testIsOTelDocument_resourceNotMap() {
+        Map<String, Object> source = new HashMap<>();
+        source.put("resource", "not a map");
+        source.put("scope", new HashMap<>());
+        assertFalse(NormalizeForStreamProcessor.isOTelDocument(source));
+    }
+
+    public void testIsOTelDocument_invalidResourceAttributes() {
+        Map<String, Object> resource = new HashMap<>();
+        resource.put("attributes", "not a map");
+        Map<String, Object> source = new HashMap<>();
+        source.put("resource", resource);
+        source.put("scope", new HashMap<>());
+        assertFalse(NormalizeForStreamProcessor.isOTelDocument(source));
+    }
+
+    public void testIsOTelDocument_scopeNotMap() {
+        Map<String, Object> source = new HashMap<>();
+        source.put("resource", new HashMap<>());
+        source.put("scope", "not a map");
+        assertFalse(NormalizeForStreamProcessor.isOTelDocument(source));
+    }
+
+    public void testIsOTelDocument_invalidAttributes() {
+        Map<String, Object> source = new HashMap<>();
+        source.put("resource", new HashMap<>());
+        source.put("scope", new HashMap<>());
+        source.put("attributes", "not a map");
+        assertFalse(NormalizeForStreamProcessor.isOTelDocument(source));
+    }
+
+    public void testIsOTelDocument_invalidBody() {
+        Map<String, Object> source = new HashMap<>();
+        source.put("resource", new HashMap<>());
+        source.put("scope", new HashMap<>());
+        source.put("body", "not a map");
+        assertFalse(NormalizeForStreamProcessor.isOTelDocument(source));
+    }
+
+    public void testIsOTelDocument_invalidBodyText() {
+        Map<String, Object> body = new HashMap<>();
+        body.put("text", 123);
+        Map<String, Object> source = new HashMap<>();
+        source.put("resource", new HashMap<>());
+        source.put("scope", new HashMap<>());
+        source.put("body", body);
+        assertFalse(NormalizeForStreamProcessor.isOTelDocument(source));
+    }
+
+    public void testIsOTelDocument_invalidBodyStructured() {
+        Map<String, Object> body = new HashMap<>();
+        body.put("structured", "a string");
+        Map<String, Object> source = new HashMap<>();
+        source.put("resource", new HashMap<>());
+        source.put("scope", new HashMap<>());
+        source.put("body", body);
+        assertFalse(NormalizeForStreamProcessor.isOTelDocument(source));
+    }
+
+    public void testIsOTelDocument_validBody() {
+        Map<String, Object> body = new HashMap<>();
+        body.put("text", "a string");
+        body.put("structured", new HashMap<>());
+        Map<String, Object> source = new HashMap<>();
+        source.put("resource", new HashMap<>());
+        source.put("scope", new HashMap<>());
+        source.put("body", body);
+        assertTrue(NormalizeForStreamProcessor.isOTelDocument(source));
+    }
+
+    public void testExecute_validOTelDocument() {
+        Map<String, Object> source = Map.ofEntries(
+            entry("resource", Map.of()),
+            entry("scope", Map.of()),
+            entry("body", Map.of("text", "a string", "structured", Map.of())),
+            entry("key1", "value1")
+        );
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
+        Map<String, Object> shallowCopy = new HashMap<>(source);
+        processor.execute(document);
+        // verify that top level keys are not moved when processing a valid OTel document
+        assertEquals(shallowCopy, document.getSource());
+    }
+
+    public void testExecute_nonOTelDocument() {
+        Map<String, Object> source = new HashMap<>();
+        source.put("key1", "value1");
+        source.put("key2", "value2");
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
+
+        processor.execute(document);
+
+        Map<String, Object> result = document.getSource();
+        assertTrue(result.containsKey("attributes"));
+        assertTrue(result.containsKey("resource"));
+
+        Map<String, Object> attributes = get(result, "attributes");
+        assertEquals("value1", attributes.get("key1"));
+        assertEquals("value2", attributes.get("key2"));
+        assertFalse(source.containsKey("key1"));
+        assertFalse(source.containsKey("key2"));
+
+        Map<String, Object> resource = get(result, "resource");
+        assertTrue(resource.containsKey("attributes"));
+        Map<String, Object> resourceAttributes = get(resource, "attributes");
+        assertTrue(resourceAttributes.isEmpty());
+    }
+
+    public void testExecute_nonOTelDocument_withExistingAttributes() {
+        Map<String, Object> source = new HashMap<>();
+        Map<String, Object> existingAttributes = new HashMap<>();
+        existingAttributes.put("existingKey", "existingValue");
+        source.put("attributes", existingAttributes);
+        source.put("key1", "value1");
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
+
+        processor.execute(document);
+
+        Map<String, Object> result = document.getSource();
+        assertTrue(result.containsKey("attributes"));
+        assertTrue(result.containsKey("resource"));
+
+        Map<String, Object> attributes = get(result, "attributes");
+        assertEquals("existingValue", attributes.get("attributes.existingKey"));
+        assertEquals("value1", attributes.get("key1"));
+
+        Map<String, Object> resource = get(result, "resource");
+        assertTrue(resource.containsKey("attributes"));
+        Map<String, Object> resourceAttributes = get(resource, "attributes");
+        assertTrue(resourceAttributes.isEmpty());
+    }
+
+    public void testExecute_nonOTelDocument_withExistingResource() {
+        Map<String, Object> source = new HashMap<>();
+        Map<String, Object> existingResource = new HashMap<>();
+        existingResource.put("existingKey", "existingValue");
+        source.put("resource", existingResource);
+        source.put("scope", "invalid scope");
+        source.put("key1", "value1");
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
+
+        processor.execute(document);
+
+        Map<String, Object> result = document.getSource();
+        assertTrue(result.containsKey("attributes"));
+        assertTrue(result.containsKey("resource"));
+
+        Map<String, Object> attributes = get(result, "attributes");
+        assertEquals("value1", attributes.get("key1"));
+        assertEquals("existingValue", attributes.get("resource.existingKey"));
+        assertEquals("invalid scope", attributes.get("scope"));
+
+        Map<String, Object> resource = get(result, "resource");
+        assertTrue(resource.containsKey("attributes"));
+        Map<String, Object> resourceAttributes = get(resource, "attributes");
+        assertTrue(resourceAttributes.isEmpty());
+    }
+
+    public void testRenameSpecialKeys_nestedForm() {
+        Map<String, Object> source = new HashMap<>();
+        Map<String, Object> span = new HashMap<>();
+        span.put("id", "spanIdValue");
+        source.put("span", span);
+        Map<String, Object> log = new HashMap<>();
+        log.put("level", "logLevelValue");
+        source.put("log", log);
+        Map<String, Object> trace = new HashMap<>();
+        trace.put("id", "traceIdValue");
+        source.put("trace", trace);
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
+
+        NormalizeForStreamProcessor.renameSpecialKeys(document);
+
+        Map<String, Object> result = document.getSource();
+        assertEquals("spanIdValue", result.get("span_id"));
+        assertFalse(result.containsKey("span"));
+        assertEquals("logLevelValue", result.get("severity_text"));
+        assertFalse(result.containsKey("log"));
+        assertEquals("traceIdValue", result.get("trace_id"));
+        assertFalse(result.containsKey("trace"));
+    }
+
+    public void testRenameSpecialKeys_topLevelDottedField() {
+        Map<String, Object> source = new HashMap<>();
+        source.put("span.id", "spanIdValue");
+        source.put("log.level", "logLevelValue");
+        source.put("trace.id", "traceIdValue");
+        source.put("message", "this is a message");
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
+
+        NormalizeForStreamProcessor.renameSpecialKeys(document);
+
+        Map<String, Object> result = document.getSource();
+        assertEquals("spanIdValue", result.get("span_id"));
+        assertEquals("logLevelValue", result.get("severity_text"));
+        assertEquals("traceIdValue", result.get("trace_id"));
+        Map<String, Object> body = get(result, "body");
+        String text = get(body, "text");
+        assertEquals("this is a message", text);
+        assertFalse(source.containsKey("span.id"));
+        assertFalse(source.containsKey("log.level"));
+        assertFalse(source.containsKey("trace.id"));
+        assertFalse(source.containsKey("message"));
+    }
+
+    public void testRenameSpecialKeys_mixedForm() {
+        Map<String, Object> source = new HashMap<>();
+        Map<String, Object> span = new HashMap<>();
+        span.put("id", "nestedSpanIdValue");
+        source.put("span", span);
+        source.put("span.id", "topLevelSpanIdValue");
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
+
+        NormalizeForStreamProcessor.renameSpecialKeys(document);
+
+        Map<String, Object> result = document.getSource();
+        // nested form should take precedence
+        assertEquals("nestedSpanIdValue", result.get("span_id"));
+    }
+
+    public void testExecute_moveFlatAttributes() {
+        Map<String, Object> source = new HashMap<>();
+        Map<String, Object> expectedResourceAttributes = new HashMap<>();
+        EcsOTelResourceAttributes.LATEST.forEach(attribute -> {
+            String value = randomAlphaOfLength(10);
+            source.put(attribute, value);
+            expectedResourceAttributes.put(attribute, value);
+        });
+        Map<String, Object> expectedAttributes = Map.of("agent.non-resource", "value", "service.non-resource", "value", "foo", "bar");
+        source.putAll(expectedAttributes);
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
+
+        processor.execute(document);
+
+        assertTrue(source.containsKey("resource"));
+        Map<String, Object> resource = get(source, "resource");
+        Map<String, Object> resourceAttributes = get(resource, "attributes");
+        assertEquals(expectedResourceAttributes, resourceAttributes);
+        EcsOTelResourceAttributes.LATEST.forEach(attribute -> assertFalse(source.containsKey(attribute)));
+
+        assertTrue(source.containsKey("attributes"));
+        Map<String, Object> attributes = get(source, "attributes");
+        assertEquals(expectedAttributes, attributes);
+        assertFalse(source.containsKey("foo"));
+        assertFalse(source.containsKey("agent.non-resource"));
+        assertFalse(source.containsKey("service.non-resource"));
+    }
+
+    public void testExecute_moveNestedAttributes() {
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, new HashMap<>());
+
+        Map<String, Object> expectedResourceAttributes = new HashMap<>();
+        EcsOTelResourceAttributes.LATEST.forEach(attribute -> {
+            String value = randomAlphaOfLength(10);
+            // parses dots as object notations
+            document.setFieldValue(attribute, value);
+            expectedResourceAttributes.put(attribute, value);
+        });
+        Map<String, Object> expectedAttributes = Map.of("agent.non-resource", "value", "service.non-resource", "value", "foo", "bar");
+        expectedAttributes.forEach(document::setFieldValue);
+
+        processor.execute(document);
+
+        Map<String, Object> source = document.getSource();
+
+        assertTrue(source.containsKey("resource"));
+        Map<String, Object> resource = get(source, "resource");
+        Map<String, Object> resourceAttributes = get(resource, "attributes");
+        assertEquals(expectedResourceAttributes, resourceAttributes);
+        EcsOTelResourceAttributes.LATEST.forEach(attribute -> {
+            // parse first part of the key
+            String namespace = attribute.substring(0, attribute.indexOf('.'));
+            assertFalse(source.containsKey(namespace));
+        });
+        assertTrue(source.containsKey("attributes"));
+        Map<String, Object> attributes = get(source, "attributes");
+        assertEquals(expectedAttributes, attributes);
+        assertFalse(source.containsKey("foo"));
+        assertFalse(source.containsKey("agent.non-resource"));
+        assertFalse(source.containsKey("service.non-resource"));
+    }
+
+    public void testKeepNullValues() {
+        Map<String, Object> source = new HashMap<>();
+        Map<String, Object> span = new HashMap<>();
+        span.put("id", null);
+        source.put("span", span);
+        source.put("log.level", null);
+        source.put("trace_id", null);
+        source.put("foo", null);
+        source.put("agent.name", null);
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
+
+        processor.execute(document);
+
+        assertFalse(source.containsKey("span"));
+        assertTrue(source.containsKey("span_id"));
+        assertNull(source.get("span_id"));
+        assertFalse(source.containsKey("log"));
+        assertTrue(source.containsKey("severity_text"));
+        assertNull(source.get("severity_text"));
+        assertFalse(source.containsKey("trace_id"));
+        Map<String, Object> expectedAttributes = new HashMap<>();
+        expectedAttributes.put("foo", null);
+        expectedAttributes.put("trace_id", null);
+        assertEquals(expectedAttributes, get(source, "attributes"));
+        Map<String, Object> expectedResourceAttributes = new HashMap<>();
+        expectedResourceAttributes.put("agent.name", null);
+        assertEquals(expectedResourceAttributes, get(get(source, "resource"), "attributes"));
+    }
+
+    public void testExecute_deepFlattening() {
+        Map<String, Object> source = new HashMap<>();
+        Map<String, Object> service = new HashMap<>();
+        service.put("name", "serviceNameValue");
+        Map<String, Object> node = new HashMap<>();
+        node.put("name", "serviceNodeNameValue");
+        node.put("type", "serviceNodeTypeValue");
+        service.put("node", node);
+        source.put("service", service);
+
+        Map<String, Object> top = new HashMap<>();
+        top.put("child", "childValue");
+        Map<String, Object> nestedChild = new HashMap<>();
+        nestedChild.put("grandchild", "grandchildValue");
+        top.put("nested-child", nestedChild);
+        source.put("top", top);
+
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
+
+        processor.execute(document);
+
+        Map<String, Object> result = document.getSource();
+
+        Map<String, Object> expectedResourceAttributes = Map.of(
+            "service.name",
+            "serviceNameValue",
+            "service.node.name",
+            "serviceNodeNameValue"
+        );
+
+        assertTrue(result.containsKey("resource"));
+        Map<String, Object> resource = get(result, "resource");
+        Map<String, Object> resourceAttributes = get(resource, "attributes");
+        assertEquals(expectedResourceAttributes, resourceAttributes);
+        assertNull(resource.get("service"));
+
+        Map<String, Object> expectedAttributes = Map.of(
+            "service.node.type",
+            "serviceNodeTypeValue",
+            "top.child",
+            "childValue",
+            "top.nested-child.grandchild",
+            "grandchildValue"
+        );
+
+        assertTrue(result.containsKey("attributes"));
+        Map<String, Object> attributes = get(result, "attributes");
+        assertEquals(expectedAttributes, attributes);
+        assertNull(attributes.get("top"));
+    }
+
+    public void testExecute_arraysNotFlattened() {
+        Map<String, Object> source = new HashMap<>();
+        Map<String, Object> nestedAgent = new HashMap<>();
+        nestedAgent.put("name", "agentNameValue");
+        List<String> agentArray = List.of("value1", "value2");
+        nestedAgent.put("array", agentArray);
+        source.put("agent", nestedAgent);
+
+        Map<String, Object> nestedService = new HashMap<>();
+        List<String> serviceNameArray = List.of("value1", "value2");
+        nestedService.put("name", serviceNameArray);
+        source.put("service", nestedService);
+
+        IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
+
+        processor.execute(document);
+
+        Map<String, Object> result = document.getSource();
+
+        Map<String, Object> expectedResourceAttributes = Map.of("agent.name", "agentNameValue", "service.name", serviceNameArray);
+
+        assertTrue(result.containsKey("resource"));
+        Map<String, Object> resource = get(result, "resource");
+        Map<String, Object> resourceAttributes = get(resource, "attributes");
+        assertEquals(expectedResourceAttributes, resourceAttributes);
+
+        assertTrue(result.containsKey("attributes"));
+        Map<String, Object> attributes = get(result, "attributes");
+        assertEquals(Map.of("agent.array", agentArray), attributes);
+
+        assertNull(resource.get("agent"));
+        assertNull(attributes.get("service"));
+    }
+
+    /**
+     * A utility function for getting a key from a map and casting the result.
+     */
+    @SuppressWarnings("unchecked")
+    private static <T> T get(Map<String, Object> context, String key) {
+        return (T) context.get(key);
+    }
+}

+ 154 - 0
modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/OTelSemConvCrawler.java

@@ -0,0 +1,154 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.core.SuppressForbidden;
+import org.elasticsearch.xcontent.XContentFactory;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xcontent.XContentType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/**
+ * This class is responsible for crawling and extracting OpenTelemetry semantic convention
+ * resource attributes from the OpenTelemetry GitHub repository. It handles downloading,
+ * unzipping, and processing YAML files to extract specific referenced resource attribute names.
+ * It eventually deletes the downloaded zip file and extracted repository directory.
+ */
+public class OTelSemConvCrawler {
+
+    public static final String SEM_CONV_GITHUB_REPO_ZIP_URL =
+        "https://github.com/open-telemetry/semantic-conventions/archive/refs/heads/main.zip";
+
+    private static final Logger logger = LogManager.getLogger(OTelSemConvCrawler.class);
+
+    @SuppressForbidden(reason = "writing the GitHub repo zip file to the test's runtime temp directory and deleting on exit")
+    static Set<String> collectOTelSemConvResourceAttributes() {
+        Path semConvZipFilePath = null;
+        Path semConvExtractedTmpDirPath = null;
+        Set<String> resourceAttributes = new HashSet<>();
+        try (HttpClient httpClient = HttpClient.newBuilder().followRedirects(HttpClient.Redirect.ALWAYS).build()) {
+
+            semConvZipFilePath = Files.createTempFile("otel-semconv-", ".zip");
+
+            // Download zip
+            HttpResponse<Path> response = httpClient.send(
+                HttpRequest.newBuilder(URI.create(SEM_CONV_GITHUB_REPO_ZIP_URL)).build(),
+                HttpResponse.BodyHandlers.ofFile(semConvZipFilePath)
+            );
+
+            if (response.statusCode() != 200) {
+                logger.error("failed to download semantic conventions zip file");
+                return resourceAttributes;
+            }
+
+            // Unzip
+            semConvExtractedTmpDirPath = Files.createTempDirectory("otel-semconv-extracted-");
+            try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(semConvZipFilePath))) {
+                ZipEntry entry;
+                while ((entry = zis.getNextEntry()) != null) {
+                    if (entry.isDirectory() == false) {
+                        Path outPath = semConvExtractedTmpDirPath.resolve(entry.getName());
+                        Files.createDirectories(outPath.getParent());
+                        Files.copy(zis, outPath, StandardCopyOption.REPLACE_EXISTING);
+                    }
+                }
+            }
+
+            // look for the model root at semantic-conventions-main/model
+            Path semConvModelRootDir = semConvExtractedTmpDirPath.resolve("semantic-conventions-main/model");
+            if (Files.exists(semConvModelRootDir) == false) {
+                logger.error("model directory not found in the extracted zip");
+                return resourceAttributes;
+            }
+
+            try (Stream<Path> semConvFileStream = Files.walk(semConvModelRootDir)) {
+                semConvFileStream.filter(path -> path.toString().endsWith(".yaml") || path.toString().endsWith(".yml"))
+                    .parallel()
+                    .forEach(path -> {
+                        try (
+                            InputStream inputStream = Files.newInputStream(path);
+                            XContentParser parser = XContentFactory.xContent(XContentType.YAML)
+                                .createParser(XContentParserConfiguration.EMPTY, inputStream)
+                        ) {
+                            Map<String, Object> yamlData = parser.map();
+                            Object groupsObj = yamlData.get("groups");
+                            if (groupsObj instanceof List<?> groups) {
+                                for (Object group : groups) {
+                                    if (group instanceof Map<?, ?> groupMap && "entity".equals(groupMap.get("type"))) {
+                                        Object attrs = groupMap.get("attributes");
+                                        if (attrs instanceof List<?> attrList) {
+                                            for (Object attr : attrList) {
+                                                if (attr instanceof Map<?, ?> attrMap) {
+                                                    String refVal = (String) attrMap.get("ref");
+                                                    if (refVal != null) {
+                                                        resourceAttributes.add(refVal);
+                                                    }
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        } catch (IOException e) {
+                            logger.error("error parsing yaml file", e);
+                        }
+                    });
+            }
+        } catch (InterruptedException e) {
+            logger.error("interrupted", e);
+        } catch (IOException e) {
+            logger.error("IO exception", e);
+        } finally {
+            if (semConvZipFilePath != null) {
+                try {
+                    Files.deleteIfExists(semConvZipFilePath);
+                } catch (IOException e) {
+                    logger.warn("failed to delete semconv zip file", e);
+                }
+            }
+            if (semConvExtractedTmpDirPath != null) {
+                try (Stream<Path> semConvFileStream = Files.walk(semConvExtractedTmpDirPath)) {
+                    semConvFileStream.sorted(Comparator.reverseOrder()) // delete files first
+                        .forEach(path -> {
+                            try {
+                                Files.delete(path);
+                            } catch (IOException e) {
+                                logger.warn("failed to delete file: " + path, e);
+                            }
+                        });
+                } catch (IOException e) {
+                    logger.warn("failed to delete semconv zip file", e);
+                }
+            }
+        }
+
+        return resourceAttributes;
+    }
+}

+ 101 - 0
modules/ingest-otel/src/test/java/org/elasticsearch/ingest/otel/ResourceAttributesTests.java

@@ -0,0 +1,101 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import org.elasticsearch.core.SuppressForbidden;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Ignore;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * <b>DISABLED BY DEFAULT!</b><br><br>
+ * These tests are not meant for CI, but rather to be run manually to check whether the static {@link EcsOTelResourceAttributes resource
+ * attributes set} is up to date with the latest ECS and/or OpenTelemetry Semantic Conventions.
+ * We may add them to CI in the future, but as such that run periodically (nightly/weekly) and used to notify whenever the resource
+ * attributes set is not up to date.
+ */
+@SuppressForbidden(reason = "Disabled temporarily until we set up the periodic CI pipeline")
+@Ignore
+public class ResourceAttributesTests extends ESTestCase {
+
+    @SuppressForbidden(reason = "Used specifically for the output. Only meant to be run manually, not through CI.")
+    public void testCrawler() {
+        Set<String> resourceAttributes = OTelSemConvCrawler.collectOTelSemConvResourceAttributes();
+        System.out.println("Resource Attributes: " + resourceAttributes.size());
+        for (String attribute : resourceAttributes) {
+            System.out.println(attribute);
+        }
+    }
+
+    @SuppressForbidden(reason = "Used specifically for the output. Only meant to be run manually, not through CI.")
+    public void testEcsToOTelAttributeNames() {
+        Map<String, String> attributes = EcsFieldsDiscoverer.getInstance().getEcsToOTelAttributeNames();
+        System.out.println("ECS to OTel attribute mappings: " + attributes.size());
+        for (Map.Entry<String, String> entry : attributes.entrySet()) {
+            System.out.println(entry.getKey() + " --> " + entry.getValue());
+        }
+    }
+
+    public void testAttributesSetUpToDate() {
+        Map<String, String> ecsToOTelAttributeNames = EcsFieldsDiscoverer.getInstance().getEcsToOTelAttributeNames();
+        Set<String> otelResourceAttributes = OTelSemConvCrawler.collectOTelSemConvResourceAttributes();
+        Set<String> latestEcsOTelResourceAttributes = new HashSet<>();
+        ecsToOTelAttributeNames.forEach((ecsAttributeName, otelAttributeName) -> {
+            if (otelResourceAttributes.contains(otelAttributeName)) {
+                latestEcsOTelResourceAttributes.add(ecsAttributeName);
+            }
+        });
+        latestEcsOTelResourceAttributes.addAll(EcsFieldsDiscoverer.getInstance().getEcsResourceFields());
+        boolean upToDate = latestEcsOTelResourceAttributes.equals(EcsOTelResourceAttributes.LATEST);
+        if (upToDate == false) {
+            printComparisonResults(latestEcsOTelResourceAttributes);
+        } else {
+            System.out.println("Latest ECS-to-OTel resource attributes set in EcsOTelResourceAttributes is up to date.");
+        }
+        assertTrue("Latest ECS-to-OTel resource attributes set in EcsOTelResourceAttributes is not up to date.", upToDate);
+    }
+
+    @SuppressForbidden(
+        reason = "Output is used for updating the resource attributes set. Running nightly and only prints when not up to date."
+    )
+    private static void printComparisonResults(Set<String> latestEcsOTelResourceAttributes) {
+        // find and print the diff
+        Set<String> addedAttributes = new HashSet<>(latestEcsOTelResourceAttributes);
+        addedAttributes.removeAll(EcsOTelResourceAttributes.LATEST);
+        if (addedAttributes.isEmpty() == false) {
+            System.out.println();
+            System.out.println("The current resource attributes set doesn't contain the following attributes:");
+            System.out.println("-----------------------------------------------------------------------------");
+            for (String attribute : addedAttributes) {
+                System.out.println(attribute);
+            }
+            System.out.println("-----------------------------------------------------------------------------");
+            System.out.println();
+        }
+        Set<String> removedAttributes = new HashSet<>(EcsOTelResourceAttributes.LATEST);
+        removedAttributes.removeAll(latestEcsOTelResourceAttributes);
+        if (removedAttributes.isEmpty() == false) {
+            System.out.println();
+            System.out.println("The following attributes are no longer considered resource attributes:");
+            System.out.println("----------------------------------------------------------------------");
+            for (String attribute : removedAttributes) {
+                System.out.println(attribute);
+            }
+            System.out.println("----------------------------------------------------------------------");
+            System.out.println();
+        }
+        System.out.println("Consider updating EcsOTelResourceAttributes accordingly");
+        System.out.println();
+        fail("ECS to OTel resource attributes are not up to date");
+    }
+}

+ 38 - 0
modules/ingest-otel/src/yamlRestTest/java/org/elasticsearch/ingest/otel/IngestOtelClientYamlTestSuiteIT.java

@@ -0,0 +1,38 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.otel;
+
+import com.carrotsearch.randomizedtesting.annotations.Name;
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
+import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
+import org.junit.ClassRule;
+
+public class IngestOtelClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
+
+    public IngestOtelClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
+        super(testCandidate);
+    }
+
+    @ClassRule
+    public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("ingest-otel").build();
+
+    @Override
+    protected String getTestRestCluster() {
+        return cluster.getHttpAddresses();
+    }
+
+    @ParametersFactory
+    public static Iterable<Object[]> parameters() throws Exception {
+        return ESClientYamlSuiteTestCase.createParameters();
+    }
+}

+ 223 - 0
modules/ingest-otel/src/yamlRestTest/resources/rest-api-spec/test/ingest-otel/10_normalize_for_stream.yml

@@ -0,0 +1,223 @@
+---
+setup:
+  - do:
+      ingest.put_pipeline:
+        id: "normalize_for_stream_pipeline"
+        body:
+          processors:
+            - normalize_for_stream: {}
+
+---
+teardown:
+  - do:
+      ingest.delete_pipeline:
+        id: "normalize_for_stream_pipeline"
+        ignore: 404
+
+---
+"Test attributes namespacing":
+  - do:
+      index:
+        index: normalize_for_stream_test
+        id: "nested_and_flat_attributes"
+        pipeline: "normalize_for_stream_pipeline"
+        body: {
+          "agent.name": "agentNameValue",
+          "agent": {
+            "type": "agentTypeValue",
+            "deep": {
+              "nested": "nestedValue",
+              "scalar-array": [
+                "arrayValue1",
+                "arrayValue2"
+              ],
+              "object-array": [
+                {
+                  "key1": "value1"
+                },
+                {
+                  "key2": "value2"
+                }
+              ]
+            },
+            "scalar-array": [
+              "arrayValue1",
+              "arrayValue2"
+            ]
+          },
+          "cloud.region": "cloudRegionValue",
+          "cloud": {
+            "service": {
+              "name": [
+                "nameArrayValue1",
+                "nameArrayValue2"
+              ]
+            },
+            "account.id": [
+              {
+                "key1": "value1"
+              },
+              {
+                "key2": "value2"
+              }
+            ],
+          },
+          "host.name": "hostNameValue",
+          "host": {
+            "type": "hostTypeValue"
+          },
+          "service.name": "serviceNameValue",
+          "service": {
+            "type": "serviceTypeValue",
+          }
+        }
+
+  - do:
+      get:
+        index: normalize_for_stream_test
+        id: "nested_and_flat_attributes"
+  - match: { _source.resource.attributes.agent\.name: "agentNameValue" }
+  - match: { _source.resource.attributes.agent\.type: "agentTypeValue" }
+  - match: { _source.resource.attributes.cloud\.region: "cloudRegionValue" }
+  - match: { _source.resource.attributes.cloud\.service\.name: ["nameArrayValue1", "nameArrayValue2"] }
+  - match: { _source.resource.attributes.cloud\.service\.name.0: "nameArrayValue1" }
+  - match: { _source.resource.attributes.cloud\.service\.name.1: "nameArrayValue2" }
+  - match: { _source.resource.attributes.cloud\.account\.id: [{"key1" : "value1"}, {"key2" : "value2"}] }
+  - match: { _source.resource.attributes.cloud\.account\.id.0.key1: "value1" }
+  - match: { _source.resource.attributes.cloud\.account\.id.1.key2: "value2" }
+  - match: { _source.resource.attributes.host\.name: "hostNameValue" }
+  - match: { _source.resource.attributes.host\.type: "hostTypeValue" }
+  - match: { _source.resource.attributes.service\.name: "serviceNameValue" }
+  - match: { _source.attributes.agent\.scalar-array.0: "arrayValue1" }
+  - match: { _source.attributes.agent\.scalar-array.1: "arrayValue2" }
+  - match: { _source.attributes.agent\.deep\.nested: "nestedValue" }
+  - match: { _source.attributes.agent\.deep\.scalar-array.0: "arrayValue1" }
+  - match: { _source.attributes.agent\.deep\.scalar-array.1: "arrayValue2" }
+  - match: { _source.attributes.agent\.deep\.object-array.0.key1: "value1" }
+  - match: { _source.attributes.agent\.deep\.object-array.1.key2: "value2" }
+  - match: { _source.attributes.service\.type: "serviceTypeValue" }
+  - match: { _source.agent\.name: null }
+  - match: { _source.agent: null }
+  - match: { _source.agent.type: null }
+  - match: { _source.cloud\.region: null }
+  - match: { _source.cloud: null }
+  - match: { _source.host\.name: null }
+  - match: { _source.host: null }
+  - match: { _source.service\.name: null }
+  - match: { _source.service: null }
+
+---
+"Test rename special keys":
+  - do:
+      index:
+        index: normalize_for_stream_test
+        id: "rename_special_keys"
+        pipeline: "normalize_for_stream_pipeline"
+        body: {
+          "span": {
+            "id": "nestedSpanIdValue"
+          },
+          "span.id": "topLevelSpanIdValue",
+          "log.level": "topLevelLogLevelValue",
+          "trace": {
+            "id": "traceIdValue"
+          },
+          "trace.id": "topLevelTraceIdValue",
+          "message": "this is a message"
+        }
+
+  - do:
+      get:
+        index: normalize_for_stream_test
+        id: "rename_special_keys"
+  - match: { _source.span_id: "nestedSpanIdValue" }
+  - match: { _source.severity_text: "topLevelLogLevelValue" }
+  - match: { _source.trace_id: "traceIdValue" }
+  - match: { _source.body.text: "this is a message" }
+  - match: { _source.span: null }
+  - match: { _source.span\.id: null }
+  - match: { _source.log\.level: null }
+  - match: { _source.trace: null }
+  - match: { _source.trace\.id: null }
+  - match: { _source.message: null }
+
+---
+"Test valid OTel document":
+  - do:
+      index:
+        index: normalize_for_stream_test
+        id: "valid_otel_document"
+        pipeline: "normalize_for_stream_pipeline"
+        body: {
+          "resource": {
+            "attributes": {
+              "foo": "bar"
+            }
+          },
+          "scope": {
+            "foo": "bar"
+          },
+          "attributes": {
+            "foo": "bar"
+          },
+          "body": {
+            "text": "a string",
+            "structured": {}
+          },
+          "span_id": "spanIdValue",
+          "trace_id": "traceIdValue",
+          "severity_text": "severityTextValue",
+          "foo": "bar"
+        }
+
+  - do:
+      get:
+        index: normalize_for_stream_test
+        id: "valid_otel_document"
+  - match: { _source.resource.attributes.foo: "bar" }
+  - match: { _source.scope.foo: "bar" }
+  - match: { _source.attributes.foo: "bar" }
+  - match: { _source.body.text: "a string" }
+  - match: { _source.body.structured: {} }
+  - match: { _source.span_id: "spanIdValue" }
+  - match: { _source.trace_id: "traceIdValue" }
+  - match: { _source.severity_text: "severityTextValue" }
+  - match: { _source.foo: "bar" }
+
+---
+"Test invalid body field":
+  - do:
+      index:
+        index: normalize_for_stream_test
+        id: "invalid_body_field"
+        pipeline: "normalize_for_stream_pipeline"
+        body: {
+          "resource": {},
+          "scope": {
+            "foo": "bar"
+          },
+          "body": {
+            "text": 123,
+            "structured": {
+              "foo": "bar"
+            }
+          },
+          "span_id": "spanIdValue",
+          "trace_id": "traceIdValue",
+          "severity_text": "severityTextValue",
+          "foo": "bar"
+        }
+
+  - do:
+      get:
+        index: normalize_for_stream_test
+        id: "invalid_body_field"
+  - match: { _source.attributes.body\.text: 123 }
+  - match: { _source.attributes.body\.structured\.foo: "bar" }
+  - match: { _source.attributes.scope\.foo: "bar" }
+  - match: { _source.attributes.span_id: "spanIdValue" }
+  - match: { _source.attributes.trace_id: "traceIdValue" }
+  - match: { _source.attributes.severity_text: "severityTextValue" }
+  - match: { _source.attributes.foo: "bar" }
+  - match: { _source.body: null }
+  - match: { _source.scope: null }