Browse Source

Introduce on_failure_pipeline ingest metadata inside on_failure block (#49076)

In case an exception occurs inside a pipeline processor,
the pipeline stack is kept around as header in the exception.
Then in the on_failure processor the id of the pipeline the
exception occurred is made accessible via the `on_failure_pipeline`
ingest metadata.

Closes #44920
Martijn van Groningen 5 years ago
parent
commit
2ba00c8149

+ 4 - 3
docs/reference/ingest/ingest-node.asciidoc

@@ -376,7 +376,7 @@ The `if` condition can be more then a simple equality check.
 The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available and
 running in the {painless}/painless-ingest-processor-context.html[ingest processor context].
 
-IMPORTANT: The value of ctx is read-only in `if` conditions. 
+IMPORTANT: The value of ctx is read-only in `if` conditions.
 
 A more complex `if` condition that drops the document (i.e. not index it)
 unless it has a multi-valued tag field with at least one value that contains the characters
@@ -718,8 +718,9 @@ The `ignore_failure` can be set on any processor and defaults to `false`.
 
 You may want to retrieve the actual error message that was thrown
 by a failed processor. To do so you can access metadata fields called
-`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible
-from within the context of an `on_failure` block.
+`on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag` and
+`on_failure_pipeline` (in case an error occurred inside a pipeline processor).
+These fields are only accessible from within the context of an `on_failure` block.
 
 Here is an updated version of the example that you
 saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`

+ 1 - 1
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml

@@ -107,4 +107,4 @@ teardown:
       pipeline: "outer"
       body: {}
 - match: { error.root_cause.0.type: "ingest_processor_exception" }
-- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
+- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }

+ 1 - 1
server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

@@ -54,7 +54,7 @@ class SimulateExecutionService {
                 handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
             });
         } else {
-            pipeline.execute(ingestDocument, (result, e) -> {
+            ingestDocument.executePipeline(pipeline, (result, e) -> {
                 if (e == null) {
                     handler.accept(new SimulateDocumentBaseResult(result), null);
                 } else {

+ 16 - 3
server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

@@ -40,6 +40,7 @@ public class CompoundProcessor implements Processor {
     public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
     public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
     public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
+    public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";
 
     private final boolean ignoreFailure;
     private final List<Processor> processors;
@@ -144,7 +145,7 @@ public class CompoundProcessor implements Processor {
                     innerExecute(currentProcessor + 1, ingestDocument, handler);
                 } else {
                     IngestProcessorException compoundProcessorException =
-                        newCompoundProcessorException(e, processor.getType(), processor.getTag());
+                        newCompoundProcessorException(e, processor, ingestDocument);
                     if (onFailureProcessors.isEmpty()) {
                         handler.accept(null, compoundProcessorException);
                     } else {
@@ -177,7 +178,7 @@ public class CompoundProcessor implements Processor {
         onFailureProcessor.execute(ingestDocument, (result, e) -> {
             if (e != null) {
                 removeFailureMetadata(ingestDocument);
-                handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag()));
+                handler.accept(null, newCompoundProcessorException(e, onFailureProcessor, ingestDocument));
                 return;
             }
             if (result == null) {
@@ -192,12 +193,17 @@ public class CompoundProcessor implements Processor {
     private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
         List<String> processorTypeHeader = cause.getHeader("processor_type");
         List<String> processorTagHeader = cause.getHeader("processor_tag");
+        List<String> processorOriginHeader = cause.getHeader("pipeline_origin");
         String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
         String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
+        String failedPipelineId = (processorOriginHeader != null) ? processorOriginHeader.get(0) : null;
         Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
         ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
         ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
         ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
+        if (failedPipelineId != null) {
+            ingestMetadata.put(ON_FAILURE_PIPELINE_FIELD, failedPipelineId);
+        }
     }
 
     private void removeFailureMetadata(IngestDocument ingestDocument) {
@@ -205,21 +211,28 @@ public class CompoundProcessor implements Processor {
         ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
         ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
         ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
+        ingestMetadata.remove(ON_FAILURE_PIPELINE_FIELD);
     }
 
-    private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
+    static IngestProcessorException newCompoundProcessorException(Exception e, Processor processor, IngestDocument document) {
         if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) {
             return (IngestProcessorException) e;
         }
 
         IngestProcessorException exception = new IngestProcessorException(e);
 
+        String processorType = processor.getType();
         if (processorType != null) {
             exception.addHeader("processor_type", processorType);
         }
+        String processorTag = processor.getTag();
         if (processorTag != null) {
             exception.addHeader("processor_tag", processorTag);
         }
+        List<String> pipelineStack = document.getPipelineStack();
+        if (pipelineStack.size() > 1) {
+            exception.addHeader("pipeline_origin", pipelineStack);
+        }
 
         return exception;
     }

+ 13 - 4
server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

@@ -38,7 +38,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.EnumMap;
 import java.util.HashMap;
-import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -60,7 +60,7 @@ public final class IngestDocument {
     private final Map<String, Object> ingestMetadata;
 
     // Contains all pipelines that have been executed for this document
-    private final Set<Pipeline> executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>());
+    private final Set<String> executedPipelines = new LinkedHashSet<>();
 
     public IngestDocument(String index, String id, String routing,
                           Long version, VersionType versionType, Map<String, Object> source) {
@@ -646,9 +646,9 @@ public final class IngestDocument {
      * @param handler handles the result or failure
      */
     public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
-        if (executedPipelines.add(pipeline)) {
+        if (executedPipelines.add(pipeline.getId())) {
             pipeline.execute(this, (result, e) -> {
-                executedPipelines.remove(pipeline);
+                executedPipelines.remove(pipeline.getId());
                 handler.accept(result, e);
             });
         } else {
@@ -656,6 +656,15 @@ public final class IngestDocument {
         }
     }
 
+    /**
+     * @return a pipeline stack; all pipelines that are in execution by this document in reverse order
+     */
+    List<String> getPipelineStack() {
+        List<String> pipelineStack = new ArrayList<>(executedPipelines);
+        Collections.reverse(pipelineStack);
+        return pipelineStack;
+    }
+
     @Override
     public boolean equals(Object obj) {
         if (obj == this) { return true; }

+ 1 - 1
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -499,7 +499,7 @@ public class IngestService implements ClusterStateApplier {
         VersionType versionType = indexRequest.versionType();
         Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
         IngestDocument ingestDocument = new IngestDocument(index, id, routing, version, versionType, sourceAsMap);
-        pipeline.execute(ingestDocument, (result, e) -> {
+        ingestDocument.executePipeline(pipeline, (result, e) -> {
             long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
             totalMetrics.postIngest(ingestTimeInMillis);
             if (e != null) {

+ 79 - 0
server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java

@@ -26,14 +26,17 @@ import org.junit.Before;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.Mockito.mock;
@@ -279,6 +282,82 @@ public class CompoundProcessorTests extends ESTestCase {
         assertStats(pipeline, 1, 1, 0);
     }
 
+    public void testFailureProcessorIsInvokedOnFailure() {
+        TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> {
+            Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
+            assertThat(ingestMetadata.entrySet(), hasSize(4));
+            assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!"));
+            assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor"));
+            assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue());
+            assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2"));
+        });
+
+        Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
+        Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(false, List.of(new AbstractProcessor(null) {
+            @Override
+            public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
+                ingestDocument.executePipeline(pipeline2, handler);
+            }
+
+            @Override
+            public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+                throw new AssertionError();
+            }
+
+            @Override
+            public String getType() {
+                return "pipeline";
+            }
+        }), List.of(onFailureProcessor)));
+
+        ingestDocument.executePipeline(pipeline1, (document, e) -> {
+            assertThat(document, notNullValue());
+            assertThat(e, nullValue());
+        });
+        assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
+    }
+
+    public void testNewCompoundProcessorException() {
+        TestProcessor processor = new TestProcessor("my_tag", "my_type", new RuntimeException());
+        IngestProcessorException ingestProcessorException1 =
+            CompoundProcessor.newCompoundProcessorException(new RuntimeException(), processor, ingestDocument);
+        assertThat(ingestProcessorException1.getHeader("processor_tag"), equalTo(List.of("my_tag")));
+        assertThat(ingestProcessorException1.getHeader("processor_type"), equalTo(List.of("my_type")));
+        assertThat(ingestProcessorException1.getHeader("pipeline_origin"), nullValue());
+
+        IngestProcessorException ingestProcessorException2 =
+            CompoundProcessor.newCompoundProcessorException(ingestProcessorException1, processor, ingestDocument);
+        assertThat(ingestProcessorException2, sameInstance(ingestProcessorException1));
+    }
+
+    public void testNewCompoundProcessorExceptionPipelineOrigin() {
+        Pipeline pipeline2 = new Pipeline("2", null, null,
+            new CompoundProcessor(new TestProcessor("my_tag", "my_type", new RuntimeException())));
+        Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(new AbstractProcessor(null) {
+            @Override
+            public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+                 throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
+                ingestDocument.executePipeline(pipeline2, handler);
+            }
+
+            @Override
+            public String getType() {
+                return "my_type2";
+            }
+        }));
+
+        Exception[] holder = new Exception[1];
+        ingestDocument.executePipeline(pipeline1, (document, e) -> holder[0] = e);
+        IngestProcessorException ingestProcessorException = (IngestProcessorException) holder[0];
+        assertThat(ingestProcessorException.getHeader("processor_tag"), equalTo(List.of("my_tag")));
+        assertThat(ingestProcessorException.getHeader("processor_type"), equalTo(List.of("my_type")));
+        assertThat(ingestProcessorException.getHeader("pipeline_origin"), equalTo(List.of("2", "1")));
+    }
+
     private void assertStats(CompoundProcessor compoundProcessor, long count,  long failed, long time) {
         assertStats(0, compoundProcessor, 0L, count, failed, time);
     }

+ 156 - 2
server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java

@@ -39,13 +39,14 @@ import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -68,7 +69,7 @@ public class IngestClientIT extends ESIntegTestCase {
 
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins() {
-        return Arrays.asList(IngestTestPlugin.class);
+        return List.of(ExtendedIngestTestPlugin.class);
     }
 
     public void testSimulate() throws Exception {
@@ -293,4 +294,157 @@ public class IngestClientIT extends ESIntegTestCase {
         assertFalse(item.isFailed());
         assertEquals("auto-generated", item.getResponse().getId());
     }
+
+    public void testPipelineOriginHeader() throws Exception {
+        {
+            XContentBuilder source = jsonBuilder().startObject();
+            {
+                source.startArray("processors");
+                source.startObject();
+                {
+                    source.startObject("pipeline");
+                    source.field("name", "2");
+                    source.endObject();
+                }
+                source.endObject();
+                source.endArray();
+            }
+            source.endObject();
+            PutPipelineRequest putPipelineRequest =
+                new PutPipelineRequest("1", BytesReference.bytes(source), XContentType.JSON);
+            client().admin().cluster().putPipeline(putPipelineRequest).get();
+        }
+        {
+            XContentBuilder source = jsonBuilder().startObject();
+            {
+                source.startArray("processors");
+                source.startObject();
+                {
+                    source.startObject("pipeline");
+                    source.field("name", "3");
+                    source.endObject();
+                }
+                source.endObject();
+                source.endArray();
+            }
+            source.endObject();
+            PutPipelineRequest putPipelineRequest =
+                new PutPipelineRequest("2", BytesReference.bytes(source), XContentType.JSON);
+            client().admin().cluster().putPipeline(putPipelineRequest).get();
+        }
+        {
+            XContentBuilder source = jsonBuilder().startObject();
+            {
+                source.startArray("processors");
+                source.startObject();
+                {
+                    source.startObject("fail");
+                    source.endObject();
+                }
+                source.endObject();
+                source.endArray();
+            }
+            source.endObject();
+            PutPipelineRequest putPipelineRequest =
+                new PutPipelineRequest("3", BytesReference.bytes(source), XContentType.JSON);
+            client().admin().cluster().putPipeline(putPipelineRequest).get();
+        }
+
+        Exception e = expectThrows(Exception.class, () -> {
+            IndexRequest indexRequest = new IndexRequest("test");
+            indexRequest.source("{}", XContentType.JSON);
+            indexRequest.setPipeline("1");
+            client().index(indexRequest).get();
+        });
+        IngestProcessorException ingestException = (IngestProcessorException) e.getCause();
+        assertThat(ingestException.getHeader("processor_type"), equalTo(List.of("fail")));
+        assertThat(ingestException.getHeader("pipeline_origin"), equalTo(List.of("3", "2", "1")));
+    }
+
+    public void testPipelineProcessorOnFailure() throws Exception {
+        {
+            XContentBuilder source = jsonBuilder().startObject();
+            {
+                source.startArray("processors");
+                source.startObject();
+                {
+                    source.startObject("pipeline");
+                    source.field("name", "2");
+                    source.endObject();
+                }
+                source.endObject();
+                source.endArray();
+            }
+            {
+                source.startArray("on_failure");
+                source.startObject();
+                {
+                    source.startObject("onfailure_processor");
+                    source.endObject();
+                }
+                source.endObject();
+                source.endArray();
+            }
+            source.endObject();
+            PutPipelineRequest putPipelineRequest =
+                new PutPipelineRequest("1", BytesReference.bytes(source), XContentType.JSON);
+            client().admin().cluster().putPipeline(putPipelineRequest).get();
+        }
+        {
+            XContentBuilder source = jsonBuilder().startObject();
+            {
+                source.startArray("processors");
+                source.startObject();
+                {
+                    source.startObject("pipeline");
+                    source.field("name", "3");
+                    source.endObject();
+                }
+                source.endObject();
+                source.endArray();
+            }
+            source.endObject();
+            PutPipelineRequest putPipelineRequest =
+                new PutPipelineRequest("2", BytesReference.bytes(source), XContentType.JSON);
+            client().admin().cluster().putPipeline(putPipelineRequest).get();
+        }
+        {
+            XContentBuilder source = jsonBuilder().startObject();
+            {
+                source.startArray("processors");
+                source.startObject();
+                {
+                    source.startObject("fail");
+                    source.endObject();
+                }
+                source.endObject();
+                source.endArray();
+            }
+            source.endObject();
+            PutPipelineRequest putPipelineRequest =
+                new PutPipelineRequest("3", BytesReference.bytes(source), XContentType.JSON);
+            client().admin().cluster().putPipeline(putPipelineRequest).get();
+        }
+
+        client().prepareIndex("test").setId("1").setSource("{}", XContentType.JSON).setPipeline("1").get();
+        Map<String, Object> inserted = client().prepareGet("test", "1")
+            .get().getSourceAsMap();
+        assertThat(inserted.get("readme"), equalTo("pipeline with id [3] is a bad pipeline"));
+    }
+
+    public static class ExtendedIngestTestPlugin extends IngestTestPlugin {
+
+        @Override
+        public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
+            Map<String, Processor.Factory> factories = new HashMap<>(super.getProcessors(parameters));
+            factories.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
+            factories.put("fail", (processorFactories, tag, config) -> new TestProcessor(tag, "fail", new RuntimeException()));
+            factories.put("onfailure_processor", (processorFactories, tag, config) -> new TestProcessor(tag, "fail", document -> {
+                String onFailurePipeline = document.getFieldValue("_ingest.on_failure_pipeline", String.class);
+                document.setFieldValue("readme", "pipeline with id [" + onFailurePipeline + "] is a bad pipeline");
+            }));
+            return factories;
+        }
+    }
+
 }