Explorar el Código

[Logs+] Automatically parse JSON log events into top-level fields (#96083)

eyalkoren hace 2 años
padre
commit
7d57731291

+ 6 - 0
docs/changelog/96083.yaml

@@ -0,0 +1,6 @@
+pr: 96083
+summary: Automatically parse log events in logs data streams, if their `message` field contains JSON content
+area: Data streams
+type: enhancement
+issues:
+ - 95522

+ 0 - 1
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_logs_default_pipeline.yml

@@ -92,4 +92,3 @@ Test default logs-*-* pipeline:
             - field: '@timestamp'
   - length: { hits.hits: 1 }
   - match: { hits.hits.0.fields.@timestamp.0: '2023-05-10T00:00:00.000Z' }
-

+ 114 - 0
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/230_logs_message_pipeline.yml

@@ -0,0 +1,114 @@
+---
+Test log message JSON-parsing pipeline:
+  - do:
+      ingest.put_pipeline:
+        # opting in to use the JSON parsing pipeline for message field
+        id: "logs@custom"
+        body:  >
+          {
+            "processors": [
+              {
+                "pipeline" : {
+                  "name": "logs@json-message",
+                  "description": "A pipeline that automatically parses JSON log events into top-level fields if they are such"
+                }
+              }
+            ]
+          }
+
+  - do:
+      indices.create_data_stream:
+        name: logs-generic-default
+  - is_true: acknowledged
+
+  - do:
+      index:
+        index: logs-generic-default
+        refresh: true
+        body:
+          '@timestamp': '2023-05-10'
+          message: |-
+            {
+              "@timestamp":"2023-05-09T16:48:34.135Z",
+              "message":"json",
+              "log.level": "INFO",
+              "ecs.version": "1.6.0",
+              "service.name":"my-app",
+              "event.dataset":"my-app.RollingFile",
+              "process.thread.name":"main",
+              "log.logger":"root.pkg.MyApp"
+            }
+  - match: {result: "created"}
+
+  - do:
+      search:
+        index: logs-generic-default
+        body:
+          query:
+            term:
+              message:
+                value: 'json'
+          fields:
+            - field: 'message'
+  - length: { hits.hits: 1 }
+    # root field parsed from JSON should win
+  - match: { hits.hits.0._source.@timestamp: '2023-05-09T16:48:34.135Z' }
+  - match: { hits.hits.0._source.message: 'json' }
+  - match: { hits.hits.0.fields.message.0: 'json' }
+    # successful access to subfields verifies that dot expansion is part of the pipeline
+  - match: { hits.hits.0._source.log.level: 'INFO' }
+  - match: { hits.hits.0._source.ecs.version: '1.6.0' }
+  - match: { hits.hits.0._source.service.name: 'my-app' }
+  - match: { hits.hits.0._source.event.dataset: 'my-app.RollingFile' }
+  - match: { hits.hits.0._source.process.thread.name: 'main' }
+  - match: { hits.hits.0._source.log.logger: 'root.pkg.MyApp' }
+    # _tmp_json_message should be removed by the pipeline
+  - match: { hits.hits.0._source._tmp_json_message: null }
+
+  # test malformed-JSON parsing - parsing error should be ignored and the document should be indexed with original message
+  - do:
+      index:
+        index: logs-generic-default
+        refresh: true
+        body:
+          '@timestamp': '2023-05-10'
+          test: 'malformed_json'
+          message: '{"@timestamp":"2023-05-09T16:48:34.135Z", "message":"malformed_json"}}'
+  - match: {result: "created"}
+
+  - do:
+      search:
+        index: logs-generic-default
+        body:
+          query:
+            term:
+              test:
+                value: 'malformed_json'
+  - length: { hits.hits: 1 }
+  - match: { hits.hits.0._source.@timestamp: '2023-05-10' }
+  - match: { hits.hits.0._source.message: '{"@timestamp":"2023-05-09T16:48:34.135Z", "message":"malformed_json"}}' }
+  - match: { hits.hits.0._source._tmp_json_message: null }
+
+  # test non-string message field
+  - do:
+      index:
+        index: logs-generic-default
+        refresh: true
+        body:
+          test: 'numeric_message'
+          message: 42
+  - match: {result: "created"}
+
+  - do:
+      search:
+        index: logs-generic-default
+        body:
+          query:
+            term:
+              test:
+                value: 'numeric_message'
+          fields:
+            - field: 'message'
+  - length: { hits.hits: 1 }
+  - match: { hits.hits.0._source.message: 42 }
+  - match: { hits.hits.0.fields.message.0: '42' }

+ 40 - 17
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java

@@ -562,34 +562,57 @@ public abstract class IndexTemplateRegistry implements ClusterStateListener {
             );
 
             if (creationCheck.compareAndSet(false, true)) {
-                PipelineConfiguration existingPipeline = findInstalledPipeline(state, requiredPipeline.getId());
-                if (existingPipeline != null) {
-                    Integer existingPipelineVersion = existingPipeline.getVersion();
-                    if (existingPipelineVersion == null || existingPipelineVersion < requiredPipeline.getVersion()) {
-                        logger.info(
-                            "upgrading ingest pipeline [{}] for [{}] from version [{}] to version [{}]",
-                            requiredPipeline.getId(),
-                            getOrigin(),
-                            existingPipelineVersion,
-                            requiredPipeline.getVersion()
-                        );
-                        putIngestPipeline(requiredPipeline, creationCheck);
+                List<String> pipelineDependencies = requiredPipeline.getPipelineDependencies();
+                if (pipelineDependencies != null && pipelineDependenciesExist(state, pipelineDependencies) == false) {
+                    creationCheck.set(false);
+                    logger.trace(
+                        "not adding ingest pipeline [{}] for [{}] because its dependencies do not exist",
+                        requiredPipeline.getId(),
+                        getOrigin()
+                    );
+                } else {
+                    PipelineConfiguration existingPipeline = findInstalledPipeline(state, requiredPipeline.getId());
+                    if (existingPipeline != null) {
+                        Integer existingPipelineVersion = existingPipeline.getVersion();
+                        if (existingPipelineVersion == null || existingPipelineVersion < requiredPipeline.getVersion()) {
+                            logger.info(
+                                "upgrading ingest pipeline [{}] for [{}] from version [{}] to version [{}]",
+                                requiredPipeline.getId(),
+                                getOrigin(),
+                                existingPipelineVersion,
+                                requiredPipeline.getVersion()
+                            );
+                            putIngestPipeline(requiredPipeline, creationCheck);
+                        } else {
+                            creationCheck.set(false);
+                            logger.debug(
+                                "not adding ingest pipeline [{}] for [{}], because it already exists",
+                                requiredPipeline.getId(),
+                                getOrigin()
+                            );
+                        }
                     } else {
                         logger.debug(
-                            "not adding ingest pipeline [{}] for [{}], because it already exists",
+                            "adding ingest pipeline [{}] for [{}], because it doesn't exist",
                             requiredPipeline.getId(),
                             getOrigin()
                         );
-                        creationCheck.set(false);
+                        putIngestPipeline(requiredPipeline, creationCheck);
                     }
-                } else {
-                    logger.debug("adding ingest pipeline [{}] for [{}], because it doesn't exist", requiredPipeline.getId(), getOrigin());
-                    putIngestPipeline(requiredPipeline, creationCheck);
                 }
             }
         }
     }
 
+    private boolean pipelineDependenciesExist(ClusterState state, List<String> dependencies) {
+        for (String dependency : dependencies) {
+            if (findInstalledPipeline(state, dependency) == null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     @Nullable
     private static PipelineConfiguration findInstalledPipeline(ClusterState state, String pipelineId) {
         Optional<IngestMetadata> maybeMeta = Optional.ofNullable(state.metadata().custom(IngestMetadata.TYPE));

+ 18 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IngestPipelineConfig.java

@@ -10,6 +10,8 @@ package org.elasticsearch.xpack.core.template;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -21,11 +23,23 @@ public class IngestPipelineConfig {
     private final int version;
     private final String versionProperty;
 
+    /**
+     * A list of this pipeline's dependencies, for example- such referred to through a pipeline processor.
+     * This list is used to enforce proper ordering of pipeline installation, so that a pipeline gets installed only if all its
+     * dependencies are already installed.
+     */
+    private final List<String> dependencies;
+
     public IngestPipelineConfig(String id, String resource, int version, String versionProperty) {
+        this(id, resource, version, versionProperty, Collections.emptyList());
+    }
+
+    public IngestPipelineConfig(String id, String resource, int version, String versionProperty, List<String> dependencies) {
         this.id = Objects.requireNonNull(id);
         this.resource = Objects.requireNonNull(resource);
         this.version = version;
         this.versionProperty = Objects.requireNonNull(versionProperty);
+        this.dependencies = dependencies;
     }
 
     public String getId() {
@@ -40,6 +54,10 @@ public class IngestPipelineConfig {
         return versionProperty;
     }
 
+    public List<String> getPipelineDependencies() {
+        return dependencies;
+    }
+
     public BytesReference loadConfig() {
         return new BytesArray(TemplateUtils.loadTemplate(resource, String.valueOf(version), versionProperty));
     }

+ 48 - 0
x-pack/plugin/core/src/main/resources/logs-json-message-pipeline.json

@@ -0,0 +1,48 @@
+{
+  "processors": [
+    {
+      "rename": {
+        "if": "ctx.message instanceof String && ctx.message.startsWith('{') && ctx.message.endsWith('}')",
+        "field": "message",
+        "target_field": "_tmp_json_message",
+        "ignore_missing": true
+      }
+    },
+    {
+      "json": {
+        "if": "ctx._tmp_json_message != null",
+        "field": "_tmp_json_message",
+        "add_to_root": true,
+        "add_to_root_conflict_strategy": "merge",
+        "allow_duplicate_keys": true,
+        "on_failure": [
+          {
+            "rename": {
+              "field": "_tmp_json_message",
+              "target_field": "message",
+              "ignore_missing": true
+            }
+          }
+        ]
+      }
+    },
+    {
+      "dot_expander" : {
+        "if": "ctx._tmp_json_message != null",
+        "field": "*",
+        "override": true
+      }
+    },
+    {
+      "remove" : {
+        "field": "_tmp_json_message",
+        "ignore_missing": true
+      }
+    }
+  ],
+  "_meta": {
+    "description": "automatic parsing of JSON log messages",
+    "managed": true
+  },
+  "version": ${xpack.stack.template.version}
+}

+ 43 - 8
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistryTests.java

@@ -83,14 +83,14 @@ public class IndexTemplateRegistryTests extends ESTestCase {
         threadPool.shutdownNow();
     }
 
-    public void testThatPipelinesAreAddedImmediately() throws Exception {
+    public void testThatIndependentPipelinesAreAddedImmediately() throws Exception {
         DiscoveryNode node = TestDiscoveryNode.create("node");
         DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
 
         AtomicInteger calledTimes = new AtomicInteger(0);
         client.setVerifier((action, request, listener) -> {
             if (action instanceof PutPipelineAction) {
-                assertPutPipelineAction(calledTimes, action, request, listener);
+                assertPutPipelineAction(calledTimes, action, request, listener, "custom-plugin-final_pipeline");
                 return AcknowledgedResponse.TRUE;
             } else {
                 // the composable template is not expected to be added, as it's dependency is not available in the cluster state
@@ -102,7 +102,34 @@ public class IndexTemplateRegistryTests extends ESTestCase {
 
         ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), nodes);
         registry.clusterChanged(event);
-        assertBusy(() -> assertThat(calledTimes.get(), equalTo(2)));
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
+    }
+
+    public void testThatDependentPipelinesAreAddedIfDependenciesExist() throws Exception {
+        DiscoveryNode node = TestDiscoveryNode.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+        client.setVerifier((action, request, listener) -> {
+            if (action instanceof PutPipelineAction) {
+                assertPutPipelineAction(calledTimes, action, request, listener, "custom-plugin-default_pipeline");
+                return AcknowledgedResponse.TRUE;
+            } else {
+                // the composable template is not expected to be added, as it's dependency is not available in the cluster state
+                // custom-plugin-settings.json is not expected to be added as it contains a dependency on the default_pipeline
+                fail("client called with unexpected request: " + request.toString());
+                return null;
+            }
+        });
+
+        ClusterChangedEvent event = createClusterChangedEvent(
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Map.of("custom-plugin-final_pipeline", 3),
+            nodes
+        );
+        registry.clusterChanged(event);
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
     }
 
     public void testThatTemplateIsAddedIfAllDependenciesExist() throws Exception {
@@ -138,7 +165,7 @@ public class IndexTemplateRegistryTests extends ESTestCase {
         AtomicInteger calledTimes = new AtomicInteger(0);
         client.setVerifier((action, request, listener) -> {
             if (action instanceof PutPipelineAction) {
-                assertPutPipelineAction(calledTimes, action, request, listener);
+                assertPutPipelineAction(calledTimes, action, request, listener, "custom-plugin-default_pipeline");
                 return AcknowledgedResponse.TRUE;
             } else {
                 // the template is not expected to be added, as the final pipeline is missing
@@ -150,7 +177,7 @@ public class IndexTemplateRegistryTests extends ESTestCase {
         ClusterChangedEvent event = createClusterChangedEvent(
             Collections.emptyMap(),
             Collections.emptyMap(),
-            Map.of("custom-plugin-default_pipeline", 3),
+            Map.of("custom-plugin-final_pipeline", 3),
             nodes
         );
         registry.clusterChanged(event);
@@ -188,7 +215,14 @@ public class IndexTemplateRegistryTests extends ESTestCase {
         AtomicInteger calledTimes = new AtomicInteger(0);
         client.setVerifier((action, request, listener) -> {
             if (action instanceof PutPipelineAction) {
-                assertPutPipelineAction(calledTimes, action, request, listener);
+                assertPutPipelineAction(
+                    calledTimes,
+                    action,
+                    request,
+                    listener,
+                    "custom-plugin-default_pipeline",
+                    "custom-plugin-final_pipeline"
+                );
                 return AcknowledgedResponse.TRUE;
             } else if (action instanceof PutComponentTemplateAction) {
                 assertPutComponentTemplate(calledTimes, action, request, listener);
@@ -277,12 +311,13 @@ public class IndexTemplateRegistryTests extends ESTestCase {
         AtomicInteger calledTimes,
         ActionType<?> action,
         ActionRequest request,
-        ActionListener<?> listener
+        ActionListener<?> listener,
+        String... pipelineIds
     ) {
         assertThat(action, instanceOf(PutPipelineAction.class));
         assertThat(request, instanceOf(PutPipelineRequest.class));
         final PutPipelineRequest putRequest = (PutPipelineRequest) request;
-        assertThat(putRequest.getId(), oneOf("custom-plugin-default_pipeline", "custom-plugin-final_pipeline"));
+        assertThat(putRequest.getId(), oneOf(pipelineIds));
         PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(
             putRequest.getId(),
             putRequest.getSource(),

+ 3 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/TestRegistryWithCustomPlugin.java

@@ -18,6 +18,7 @@ import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.json.JsonXContent;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -75,7 +76,8 @@ class TestRegistryWithCustomPlugin extends IndexTemplateRegistry {
                 "custom-plugin-default_pipeline",
                 "/org/elasticsearch/xpack/core/template/custom-plugin-default_pipeline.json",
                 REGISTRY_VERSION,
-                TEMPLATE_VERSION_VARIABLE
+                TEMPLATE_VERSION_VARIABLE,
+                Collections.singletonList("custom-plugin-final_pipeline")
             ),
             new IngestPipelineConfig(
                 "custom-plugin-final_pipeline",

+ 8 - 1
x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java

@@ -223,7 +223,14 @@ public class StackTemplateRegistry extends IndexTemplateRegistry {
     }
 
     private static final List<IngestPipelineConfig> INGEST_PIPELINE_CONFIGS = List.of(
-        new IngestPipelineConfig("logs-default-pipeline", "/logs-default-pipeline.json", REGISTRY_VERSION, TEMPLATE_VERSION_VARIABLE)
+        new IngestPipelineConfig("logs@json-message", "/logs-json-message-pipeline.json", REGISTRY_VERSION, TEMPLATE_VERSION_VARIABLE),
+        new IngestPipelineConfig(
+            "logs-default-pipeline",
+            "/logs-default-pipeline.json",
+            REGISTRY_VERSION,
+            TEMPLATE_VERSION_VARIABLE,
+            Collections.singletonList("logs@json-message")
+        )
     );
 
     @Override

+ 14 - 2
x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java

@@ -198,7 +198,7 @@ public class StackTemplateRegistryTests extends ESTestCase {
         registry.clusterChanged(event);
     }
 
-    public void testThatRequiredPipelinesAreAdded() throws Exception {
+    public void testThatIndependentPipelinesAreAdded() throws Exception {
         DiscoveryNode node = TestDiscoveryNode.create("node");
         DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
 
@@ -225,7 +225,19 @@ public class StackTemplateRegistryTests extends ESTestCase {
 
         ClusterChangedEvent event = createInitialClusterChangedEvent(nodes);
         registry.clusterChanged(event);
-        assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getIngestPipelines().size())));
+        assertBusy(
+            () -> assertThat(
+                calledTimes.get(),
+                equalTo(
+                    Long.valueOf(
+                        registry.getIngestPipelines()
+                            .stream()
+                            .filter(ingestPipelineConfig -> ingestPipelineConfig.getPipelineDependencies().isEmpty())
+                            .count()
+                    ).intValue()
+                )
+            )
+        );
     }
 
     public void testPolicyAlreadyExistsButDiffers() throws IOException {