瀏覽代碼

Capture additional info into failure documents (#108277)

When a document fails in the ingest service and gets redirected
to a failure store, we're adding some more information to aid
debugging. This PR adds the `pipeline` (the pipeline that failed to
ingest the document), `pipeline_trace` (the trace of pipelines that
were executed on the document), `processor_tag`, and `processor_type`
to the failure documents.

To accommodate for said changes, we're also updating the mappings
of the failure store -- which we can do easily as failure stores are
currently still behind a feature flag.
Niels Bauman 1 年之前
父節點
當前提交
3cdf835d94

+ 30 - 3
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

@@ -15,6 +15,11 @@ teardown:
         name: generic_logs_template
         ignore: 404
 
+  - do:
+      ingest.delete_pipeline:
+        id: "parent_failing_pipeline"
+        ignore: 404
+
   - do:
       ingest.delete_pipeline:
         id: "failing_pipeline"
@@ -35,8 +40,24 @@ teardown:
             "description": "_description",
             "processors": [
               {
-                "fail" : {
-                  "message" : "error_message"
+                "fail": {
+                  "message" : "error_message",
+                  "tag": "foo-tag"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.put_pipeline:
+        id: "parent_failing_pipeline"
+        body:  >
+          {
+            "processors": [
+              {
+                "pipeline": {
+                  "name": "failing_pipeline"
                 }
               }
             ]
@@ -57,7 +78,7 @@ teardown:
               number_of_shards:   1
               number_of_replicas: 1
               index:
-                default_pipeline: "failing_pipeline"
+                default_pipeline: "parent_failing_pipeline"
 
   - do:
       index:
@@ -98,6 +119,12 @@ teardown:
   - match: { hits.hits.0._source.error.type: 'fail_processor_exception' }
   - match: { hits.hits.0._source.error.message: 'error_message' }
   - contains: { hits.hits.0._source.error.stack_trace: 'org.elasticsearch.ingest.common.FailProcessorException: error_message' }
+  - length: { hits.hits.0._source.error.pipeline_trace: 2 }
+  - match: { hits.hits.0._source.error.pipeline_trace.0: 'parent_failing_pipeline' }
+  - match: { hits.hits.0._source.error.pipeline_trace.1: 'failing_pipeline' }
+  - match: { hits.hits.0._source.error.pipeline: 'failing_pipeline' }
+  - match: { hits.hits.0._source.error.processor_tag: 'foo-tag' }
+  - match: { hits.hits.0._source.error.processor_type: 'fail' }
 
   - do:
       indices.delete_data_stream:

+ 36 - 4
server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocumentConverter.java

@@ -12,18 +12,32 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Supplier;
 
+import static org.elasticsearch.ingest.CompoundProcessor.PIPELINE_ORIGIN_EXCEPTION_HEADER;
+import static org.elasticsearch.ingest.CompoundProcessor.PROCESSOR_TAG_EXCEPTION_HEADER;
+import static org.elasticsearch.ingest.CompoundProcessor.PROCESSOR_TYPE_EXCEPTION_HEADER;
+
 /**
  * Transforms an indexing request using error information into a new index request to be stored in a data stream's failure store.
  */
 public class FailureStoreDocumentConverter {
 
+    private static final Set<String> INGEST_EXCEPTION_HEADERS = Set.of(
+        PIPELINE_ORIGIN_EXCEPTION_HEADER,
+        PROCESSOR_TAG_EXCEPTION_HEADER,
+        PROCESSOR_TYPE_EXCEPTION_HEADER
+    );
+
     /**
      * Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a
      * new {@link IndexRequest} that can be stored in a data stream's failure store.
@@ -96,10 +110,28 @@ public class FailureStoreDocumentConverter {
                 builder.field("type", ElasticsearchException.getExceptionName(unwrapped));
                 builder.field("message", unwrapped.getMessage());
                 builder.field("stack_trace", ExceptionsHelper.stackTrace(unwrapped));
-                // Further fields not yet tracked (Need to expose via specific exceptions)
-                // - pipeline
-                // - pipeline_trace
-                // - processor
+                // Try to find the IngestProcessorException somewhere in the stack trace. Since IngestProcessorException is package-private,
+                // we can't instantiate it in tests, so we'll have to check for the headers directly.
+                var ingestException = ExceptionsHelper.<ElasticsearchException>unwrapCausesAndSuppressed(
+                    exception,
+                    t -> t instanceof ElasticsearchException e && Sets.haveNonEmptyIntersection(e.getHeaderKeys(), INGEST_EXCEPTION_HEADERS)
+                ).orElse(null);
+                if (ingestException != null) {
+                    if (ingestException.getHeaderKeys().contains(PIPELINE_ORIGIN_EXCEPTION_HEADER)) {
+                        List<String> pipelineOrigin = ingestException.getHeader(PIPELINE_ORIGIN_EXCEPTION_HEADER);
+                        Collections.reverse(pipelineOrigin);
+                        if (pipelineOrigin.isEmpty() == false) {
+                            builder.field("pipeline_trace", pipelineOrigin);
+                            builder.field("pipeline", pipelineOrigin.get(pipelineOrigin.size() - 1));
+                        }
+                    }
+                    if (ingestException.getHeaderKeys().contains(PROCESSOR_TAG_EXCEPTION_HEADER)) {
+                        builder.field("processor_tag", ingestException.getHeader(PROCESSOR_TAG_EXCEPTION_HEADER).get(0));
+                    }
+                    if (ingestException.getHeaderKeys().contains(PROCESSOR_TYPE_EXCEPTION_HEADER)) {
+                        builder.field("processor_type", ingestException.getHeader(PROCESSOR_TYPE_EXCEPTION_HEADER).get(0));
+                    }
+                }
             }
             builder.endObject();
         }

+ 8 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinition.java

@@ -71,7 +71,10 @@ public class DataStreamFailureStoreDefinition {
              *           "pipeline_trace": {
              *              "type": "keyword"
              *           },
-             *           "processor": {
+             *           "processor_tag": {
+             *              "type": "keyword"
+             *           },
+             *           "processor_type": {
              *              "type": "keyword"
              *           }
              *         }
@@ -123,7 +126,10 @@ public class DataStreamFailureStoreDefinition {
                     .startObject("pipeline_trace")
                     .field("type", "keyword")
                     .endObject()
-                    .startObject("processor")
+                    .startObject("processor_tag")
+                    .field("type", "keyword")
+                    .endObject()
+                    .startObject("processor_type")
                     .field("type", "keyword")
                     .endObject()
                     .endObject()

+ 11 - 7
server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

@@ -28,6 +28,10 @@ public class CompoundProcessor implements Processor {
     public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
     public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";
 
+    public static final String PROCESSOR_TYPE_EXCEPTION_HEADER = "processor_type";
+    public static final String PROCESSOR_TAG_EXCEPTION_HEADER = "processor_tag";
+    public static final String PIPELINE_ORIGIN_EXCEPTION_HEADER = "pipeline_origin";
+
     private final boolean ignoreFailure;
     private final List<Processor> processors;
     private final List<Processor> onFailureProcessors;
@@ -286,9 +290,9 @@ public class CompoundProcessor implements Processor {
     }
 
     private static void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
-        List<String> processorTypeHeader = cause.getHeader("processor_type");
-        List<String> processorTagHeader = cause.getHeader("processor_tag");
-        List<String> processorOriginHeader = cause.getHeader("pipeline_origin");
+        List<String> processorTypeHeader = cause.getHeader(PROCESSOR_TYPE_EXCEPTION_HEADER);
+        List<String> processorTagHeader = cause.getHeader(PROCESSOR_TAG_EXCEPTION_HEADER);
+        List<String> processorOriginHeader = cause.getHeader(PIPELINE_ORIGIN_EXCEPTION_HEADER);
         String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
         String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
         String failedPipelineId = (processorOriginHeader != null) ? processorOriginHeader.get(0) : null;
@@ -310,7 +314,7 @@ public class CompoundProcessor implements Processor {
     }
 
     static IngestProcessorException newCompoundProcessorException(Exception e, Processor processor, IngestDocument document) {
-        if (e instanceof IngestProcessorException ipe && ipe.getHeader("processor_type") != null) {
+        if (e instanceof IngestProcessorException ipe && ipe.getHeader(PROCESSOR_TYPE_EXCEPTION_HEADER) != null) {
             return ipe;
         }
 
@@ -318,16 +322,16 @@ public class CompoundProcessor implements Processor {
 
         String processorType = processor.getType();
         if (processorType != null) {
-            exception.addHeader("processor_type", processorType);
+            exception.addHeader(PROCESSOR_TYPE_EXCEPTION_HEADER, processorType);
         }
         String processorTag = processor.getTag();
         if (processorTag != null) {
-            exception.addHeader("processor_tag", processorTag);
+            exception.addHeader(PROCESSOR_TAG_EXCEPTION_HEADER, processorTag);
         }
         if (document != null) {
             List<String> pipelineStack = document.getPipelineStack();
             if (pipelineStack.isEmpty() == false) {
-                exception.addHeader("pipeline_origin", pipelineStack);
+                exception.addHeader(PIPELINE_ORIGIN_EXCEPTION_HEADER, pipelineStack);
             }
         }
 

+ 46 - 4
server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentConverterTests.java

@@ -11,11 +11,15 @@ package org.elasticsearch.action.bulk;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.ingest.CompoundProcessor;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.xcontent.ObjectPath;
 import org.elasticsearch.xcontent.json.JsonXContent;
 
+import java.util.Arrays;
+import java.util.List;
+
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -24,14 +28,36 @@ import static org.hamcrest.CoreMatchers.startsWith;
 
 public class FailureStoreDocumentConverterTests extends ESTestCase {
 
-    public void testFailureStoreDocumentConverstion() throws Exception {
+    public void testFailureStoreDocumentConversion() throws Exception {
         IndexRequest source = new IndexRequest("original_index").routing("fake_routing")
             .id("1")
             .source(JsonXContent.contentBuilder().startObject().field("key", "value").endObject());
 
         // The exception will be wrapped for the test to make sure the converter correctly unwraps it
-        Exception exception = new ElasticsearchException("Test exception please ignore");
-        exception = new RemoteTransportException("Test exception wrapper, please ignore", exception);
+        ElasticsearchException exception = new ElasticsearchException("Test exception please ignore");
+        ElasticsearchException ingestException = exception;
+        if (randomBoolean()) {
+            ingestException = new ElasticsearchException("Test suppressed exception, please ignore");
+            exception.addSuppressed(ingestException);
+        }
+        boolean withPipelineOrigin = randomBoolean();
+        if (withPipelineOrigin) {
+            ingestException.addHeader(
+                CompoundProcessor.PIPELINE_ORIGIN_EXCEPTION_HEADER,
+                Arrays.asList("some-failing-pipeline", "some-pipeline")
+            );
+        }
+        boolean withProcessorTag = randomBoolean();
+        if (withProcessorTag) {
+            ingestException.addHeader(CompoundProcessor.PROCESSOR_TAG_EXCEPTION_HEADER, "foo-tag");
+        }
+        boolean withProcessorType = randomBoolean();
+        if (withProcessorType) {
+            ingestException.addHeader(CompoundProcessor.PROCESSOR_TYPE_EXCEPTION_HEADER, "bar-type");
+        }
+        if (randomBoolean()) {
+            exception = new RemoteTransportException("Test exception wrapper, please ignore", exception);
+        }
 
         String targetIndexName = "rerouted_index";
         long testTime = 1702357200000L; // 2023-12-12T05:00:00.000Z
@@ -68,7 +94,23 @@ public class FailureStoreDocumentConverterTests extends ESTestCase {
         );
         assertThat(
             ObjectPath.eval("error.stack_trace", convertedRequest.sourceAsMap()),
-            containsString("at org.elasticsearch.action.bulk.FailureStoreDocumentConverterTests.testFailureStoreDocumentConverstion")
+            containsString("at org.elasticsearch.action.bulk.FailureStoreDocumentConverterTests.testFailureStoreDocumentConversion")
+        );
+        assertThat(
+            ObjectPath.eval("error.pipeline_trace", convertedRequest.sourceAsMap()),
+            is(equalTo(withPipelineOrigin ? List.of("some-pipeline", "some-failing-pipeline") : null))
+        );
+        assertThat(
+            ObjectPath.eval("error.pipeline", convertedRequest.sourceAsMap()),
+            is(equalTo(withPipelineOrigin ? "some-failing-pipeline" : null))
+        );
+        assertThat(
+            ObjectPath.eval("error.processor_tag", convertedRequest.sourceAsMap()),
+            is(equalTo(withProcessorTag ? "foo-tag" : null))
+        );
+        assertThat(
+            ObjectPath.eval("error.processor_type", convertedRequest.sourceAsMap()),
+            is(equalTo(withProcessorType ? "bar-type" : null))
         );
 
         assertThat(convertedRequest.isWriteToFailureStore(), is(true));