Prechádzať zdrojové kódy

Merge pull request #16562 from talevy/ingest_verbose_error_metdata

add on_failure exception metadata to ingest document for verbose simulate
Tal Levy 9 rokov pred
rodič
commit
7f812f5990

+ 6 - 29
core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

@@ -23,13 +23,14 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.ingest.core.IngestDocument;
 import org.elasticsearch.ingest.core.Pipeline;
-import org.elasticsearch.ingest.core.Processor;
 import org.elasticsearch.ingest.core.CompoundProcessor;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.elasticsearch.ingest.processor.TrackingResultProcessor.decorate;
+
 class SimulateExecutionService {
 
     private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;
@@ -40,40 +41,16 @@ class SimulateExecutionService {
         this.threadPool = threadPool;
     }
 
-    void executeVerboseDocument(Processor processor, IngestDocument ingestDocument, List<SimulateProcessorResult> processorResultList) throws Exception {
-        if (processor instanceof CompoundProcessor) {
-            CompoundProcessor cp = (CompoundProcessor) processor;
-            try {
-                for (Processor p : cp.getProcessors()) {
-                    executeVerboseDocument(p, ingestDocument, processorResultList);
-                }
-            } catch (Exception e) {
-                for (Processor p : cp.getOnFailureProcessors()) {
-                    executeVerboseDocument(p, ingestDocument, processorResultList);
-                }
-            }
-        } else {
-            try {
-                processor.execute(ingestDocument);
-                processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
-            } catch (Exception e) {
-                processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
-                throw e;
-            }
-        }
-    }
-
     SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
         if (verbose) {
             List<SimulateProcessorResult> processorResultList = new ArrayList<>();
-            IngestDocument currentIngestDocument = new IngestDocument(ingestDocument);
-            CompoundProcessor pipelineProcessor = new CompoundProcessor(pipeline.getProcessors(), pipeline.getOnFailureProcessors());
+            CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
             try {
-                executeVerboseDocument(pipelineProcessor, currentIngestDocument, processorResultList);
+                verbosePipelineProcessor.execute(ingestDocument);
+                return new SimulateDocumentVerboseResult(processorResultList);
             } catch (Exception e) {
-                return new SimulateDocumentBaseResult(e);
+                return new SimulateDocumentVerboseResult(processorResultList);
             }
-            return new SimulateDocumentVerboseResult(processorResultList);
         } else {
             try {
                 pipeline.execute(ingestDocument);

+ 21 - 11
core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java

@@ -28,15 +28,16 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * A Processor that executes a list of other "processors". It executes a separate list of
  * "onFailureProcessors" when any of the processors throw an {@link Exception}.
  */
 public class CompoundProcessor implements Processor {
-    static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
-    static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
-    static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
+    public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
+    public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
+    public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
 
     private final List<Processor> processors;
     private final List<Processor> onFailureProcessors;
@@ -84,7 +85,7 @@ public class CompoundProcessor implements Processor {
 
     @Override
     public String getTag() {
-        return "compound-processor-" + Objects.hash(processors, onFailureProcessors);
+        return "CompoundProcessor-" + flattenProcessors().stream().map(Processor::getTag).collect(Collectors.joining("-"));
     }
 
     @Override
@@ -104,18 +105,27 @@ public class CompoundProcessor implements Processor {
     }
 
     void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) throws Exception {
-        Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
         try {
-            ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage());
-            ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
-            ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
+            putFailureMetadata(ingestDocument, cause, failedProcessorType, failedProcessorTag);
             for (Processor processor : onFailureProcessors) {
                 processor.execute(ingestDocument);
             }
         } finally {
-            ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
-            ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
-            ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
+            removeFailureMetadata(ingestDocument);
         }
     }
+
+    private void putFailureMetadata(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) {
+        Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
+        ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage());
+        ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
+        ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
+    }
+
+    private void removeFailureMetadata(IngestDocument ingestDocument) {
+        Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
+        ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
+        ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
+        ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
+    }
 }

+ 7 - 0
core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java

@@ -68,6 +68,13 @@ public final class Pipeline {
         return description;
     }
 
+    /**
+     * Get the underlying {@link CompoundProcessor} containing the Pipeline's processors
+     */
+    public CompoundProcessor getCompoundProcessor() {
+        return compoundProcessor;
+    }
+
     /**
      * Unmodifiable list containing each processor that operates on the data.
      */

+ 89 - 0
core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java

@@ -0,0 +1,89 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest.processor;
+
+import org.elasticsearch.action.ingest.SimulateProcessorResult;
+import org.elasticsearch.ingest.core.CompoundProcessor;
+import org.elasticsearch.ingest.core.IngestDocument;
+import org.elasticsearch.ingest.core.Processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Processor to be used within Simulate API to keep track of processors executed in pipeline.
+ */
+public final class TrackingResultProcessor implements Processor {
+
+    private final Processor actualProcessor;
+    private final List<SimulateProcessorResult> processorResultList;
+
+    public TrackingResultProcessor(Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
+        this.processorResultList = processorResultList;
+        if (actualProcessor instanceof CompoundProcessor) {
+            CompoundProcessor trackedCompoundProcessor = decorate((CompoundProcessor) actualProcessor, processorResultList);
+            this.actualProcessor = trackedCompoundProcessor;
+        } else {
+            this.actualProcessor = actualProcessor;
+        }
+    }
+
+    @Override
+    public void execute(IngestDocument ingestDocument) throws Exception {
+        try {
+            actualProcessor.execute(ingestDocument);
+            processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
+        } catch (Exception e) {
+            processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e));
+            throw e;
+        }
+    }
+
+    @Override
+    public String getType() {
+        return actualProcessor.getType();
+    }
+
+    @Override
+    public String getTag() {
+        return actualProcessor.getTag();
+    }
+
+    public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
+        List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
+        for (Processor processor : compoundProcessor.getProcessors()) {
+            if (processor instanceof CompoundProcessor) {
+                processors.add(decorate((CompoundProcessor) processor, processorResultList));
+            } else {
+                processors.add(new TrackingResultProcessor(processor, processorResultList));
+            }
+        }
+        List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
+        for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
+            if (processor instanceof CompoundProcessor) {
+                onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
+            } else {
+                onFailureProcessors.add(new TrackingResultProcessor(processor, processorResultList));
+            }
+        }
+        return new CompoundProcessor(processors, onFailureProcessors);
+    }
+}
+

+ 44 - 77
core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java

@@ -31,10 +31,8 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
 import org.junit.Before;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
+import java.util.Map;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
@@ -46,7 +44,6 @@ public class SimulateExecutionServiceTests extends ESTestCase {
 
     private ThreadPool threadPool;
     private SimulateExecutionService executionService;
-    private Pipeline pipeline;
     private Processor processor;
     private IngestDocument ingestDocument;
 
@@ -59,7 +56,6 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         );
         executionService = new SimulateExecutionService(threadPool);
         processor = new TestProcessor("id", "mock", ingestDocument -> {});
-        pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
         ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
     }
 
@@ -68,74 +64,6 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         threadPool.shutdown();
     }
 
-    public void testExecuteVerboseDocumentSimple() throws Exception {
-        List<SimulateProcessorResult> processorResultList = new ArrayList<>();
-        executionService.executeVerboseDocument(processor, ingestDocument, processorResultList);
-        SimulateProcessorResult result = new SimulateProcessorResult("id", ingestDocument);
-        assertThat(processorResultList.size(), equalTo(1));
-        assertThat(processorResultList.get(0).getProcessorTag(), equalTo(result.getProcessorTag()));
-        assertThat(processorResultList.get(0).getIngestDocument(), equalTo(result.getIngestDocument()));
-        assertThat(processorResultList.get(0).getFailure(), nullValue());
-    }
-
-    public void testExecuteVerboseDocumentSimpleException() throws Exception {
-        RuntimeException exception = new RuntimeException("mock_exception");
-        TestProcessor processor = new TestProcessor("id", "mock", ingestDocument -> { throw exception; });
-        List<SimulateProcessorResult> processorResultList = new ArrayList<>();
-        try {
-            executionService.executeVerboseDocument(processor, ingestDocument, processorResultList);
-            fail("should throw exception");
-        } catch (RuntimeException e) {
-            assertThat(e.getMessage(), equalTo("mock_exception"));
-        }
-        SimulateProcessorResult result = new SimulateProcessorResult("id", exception);
-        assertThat(processorResultList.size(), equalTo(1));
-        assertThat(processorResultList.get(0).getProcessorTag(), equalTo(result.getProcessorTag()));
-        assertThat(processorResultList.get(0).getFailure(), equalTo(result.getFailure()));
-    }
-
-    public void testExecuteVerboseDocumentCompoundSuccess() throws Exception {
-        TestProcessor processor1 = new TestProcessor("p1", "mock", ingestDocument -> { });
-        TestProcessor processor2 = new TestProcessor("p2", "mock", ingestDocument -> { });
-
-        Processor compoundProcessor = new CompoundProcessor(processor1, processor2);
-        List<SimulateProcessorResult> processorResultList = new ArrayList<>();
-        executionService.executeVerboseDocument(compoundProcessor, ingestDocument, processorResultList);
-        assertThat(processor1.getInvokedCounter(), equalTo(1));
-        assertThat(processor2.getInvokedCounter(), equalTo(1));
-        assertThat(processorResultList.size(), equalTo(2));
-        assertThat(processorResultList.get(0).getProcessorTag(), equalTo("p1"));
-        assertThat(processorResultList.get(0).getIngestDocument(), equalTo(ingestDocument));
-        assertThat(processorResultList.get(0).getFailure(), nullValue());
-        assertThat(processorResultList.get(1).getProcessorTag(), equalTo("p2"));
-        assertThat(processorResultList.get(1).getIngestDocument(), equalTo(ingestDocument));
-        assertThat(processorResultList.get(1).getFailure(), nullValue());
-    }
-
-    public void testExecuteVerboseDocumentCompoundOnFailure() throws Exception {
-        TestProcessor processor1 = new TestProcessor("p1", "mock", ingestDocument -> { });
-        TestProcessor processor2 = new TestProcessor("p2", "mock", ingestDocument -> { throw new RuntimeException("p2_exception"); });
-        TestProcessor onFailureProcessor1 = new TestProcessor("fail_p1", "mock", ingestDocument -> { });
-        TestProcessor onFailureProcessor2 = new TestProcessor("fail_p2", "mock", ingestDocument -> { throw new RuntimeException("fail_p2_exception"); });
-        TestProcessor onFailureProcessor3 = new TestProcessor("fail_p3", "mock", ingestDocument -> { });
-        CompoundProcessor onFailureCompoundProcessor = new CompoundProcessor(Collections.singletonList(onFailureProcessor2), Collections.singletonList(onFailureProcessor3));
-
-        Processor compoundProcessor = new CompoundProcessor(Arrays.asList(processor1, processor2), Arrays.asList(onFailureProcessor1, onFailureCompoundProcessor));
-        List<SimulateProcessorResult> processorResultList = new ArrayList<>();
-        executionService.executeVerboseDocument(compoundProcessor, ingestDocument, processorResultList);
-        assertThat(processor1.getInvokedCounter(), equalTo(1));
-        assertThat(processor2.getInvokedCounter(), equalTo(1));
-        assertThat(onFailureProcessor1.getInvokedCounter(), equalTo(1));
-        assertThat(onFailureProcessor2.getInvokedCounter(), equalTo(1));
-        assertThat(onFailureProcessor3.getInvokedCounter(), equalTo(1));
-        assertThat(processorResultList.size(), equalTo(5));
-        assertThat(processorResultList.get(0).getProcessorTag(), equalTo("p1"));
-        assertThat(processorResultList.get(1).getProcessorTag(), equalTo("p2"));
-        assertThat(processorResultList.get(2).getProcessorTag(), equalTo("fail_p1"));
-        assertThat(processorResultList.get(3).getProcessorTag(), equalTo("fail_p2"));
-        assertThat(processorResultList.get(4).getProcessorTag(), equalTo("fail_p3"));
-    }
-
     public void testExecuteVerboseItem() throws Exception {
         TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {});
         Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
@@ -170,16 +98,43 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         assertThat(simulateDocumentBaseResult.getFailure(), nullValue());
     }
 
-    public void testExecuteVerboseItemWithFailure() throws Exception {
+    public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
+        TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> {});
+        TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
+        TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
+        Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2, processor3));
+        SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
+        assertThat(processor1.getInvokedCounter(), equalTo(1));
+        assertThat(processor2.getInvokedCounter(), equalTo(1));
+        assertThat(processor3.getInvokedCounter(), equalTo(0));
+        assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
+        SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), equalTo(ingestDocument));
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(), not(sameInstance(ingestDocument.getSourceAndMetadata())));
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1"));
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), instanceOf(RuntimeException.class));
+        RuntimeException runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure();
+        assertThat(runtimeException.getMessage(), equalTo("processor failed"));
+    }
+
+    public void testExecuteVerboseItemWithOnFailure() throws Exception {
         TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
         TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {});
-        Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2)));
+        TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
+        Pipeline pipeline = new Pipeline("_id", "_description",
+                new CompoundProcessor(new CompoundProcessor(Collections.singletonList(processor1),
+                                Collections.singletonList(processor2)), processor3));
         SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
         assertThat(processor1.getInvokedCounter(), equalTo(1));
         assertThat(processor2.getInvokedCounter(), equalTo(1));
         assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
         SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
-        assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(3));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), nullValue());
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), instanceOf(RuntimeException.class));
@@ -187,8 +142,20 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         assertThat(runtimeException.getMessage(), equalTo("processor failed"));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1"));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), not(sameInstance(ingestDocument)));
-        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), equalTo(ingestDocument));
+
+        IngestDocument ingestDocumentWithOnFailureMetadata = new IngestDocument(ingestDocument);
+        Map<String, String> metadata = ingestDocumentWithOnFailureMetadata.getIngestMetadata();
+        metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock");
+        metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0");
+        metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed");
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), equalTo(ingestDocumentWithOnFailureMetadata));
+
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
+
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getProcessorTag(), equalTo("processor_2"));
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), not(sameInstance(ingestDocument)));
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), equalTo(ingestDocument));
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getFailure(), nullValue());
     }
 
     public void testExecuteItemWithFailure() throws Exception {

+ 129 - 0
core/src/test/java/org/elasticsearch/ingest/processor/TrackingResultProcessorTests.java

@@ -0,0 +1,129 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest.processor;
+
+import org.elasticsearch.action.ingest.SimulateProcessorResult;
+import org.elasticsearch.ingest.TestProcessor;
+import org.elasticsearch.ingest.core.CompoundProcessor;
+import org.elasticsearch.ingest.core.IngestDocument;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.ingest.core.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD;
+import static org.elasticsearch.ingest.core.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD;
+import static org.elasticsearch.ingest.core.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD;
+import static org.elasticsearch.ingest.processor.TrackingResultProcessor.decorate;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+
+public class TrackingResultProcessorTests extends ESTestCase {
+
+    private IngestDocument ingestDocument;
+    private List<SimulateProcessorResult> resultList;
+
+    @Before
+    public void init() {
+        ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
+        resultList = new ArrayList<>();
+    }
+
+    public void testActualProcessor() throws Exception {
+        TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
+        TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(actualProcessor, resultList);
+        trackingProcessor.execute(ingestDocument);
+
+        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
+
+        assertThat(actualProcessor.getInvokedCounter(), equalTo(1));
+        assertThat(resultList.size(), equalTo(1));
+
+        assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
+        assertThat(resultList.get(0).getFailure(), nullValue());
+        assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag()));
+    }
+
+    public void testActualCompoundProcessorWithoutOnFailure() throws Exception {
+        RuntimeException exception = new RuntimeException("processor failed");
+        TestProcessor testProcessor = new TestProcessor(ingestDocument -> {  throw exception; });
+        CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor);
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
+
+        try {
+            trackingProcessor.execute(ingestDocument);
+        } catch (Exception e) {
+            assertThat(e.getMessage(), equalTo(exception.getMessage()));
+        }
+
+        SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
+        assertThat(testProcessor.getInvokedCounter(), equalTo(1));
+        assertThat(resultList.size(), equalTo(1));
+        assertThat(resultList.get(0).getIngestDocument(), nullValue());
+        assertThat(resultList.get(0).getFailure(), equalTo(exception));
+        assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag()));
+    }
+
+    public void testActualCompoundProcessorWithOnFailure() throws Exception {
+        RuntimeException exception = new RuntimeException("fail");
+        TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> {  throw exception; });
+        TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
+        CompoundProcessor actualProcessor = new CompoundProcessor(
+            Arrays.asList(new CompoundProcessor(
+                Arrays.asList(failProcessor, onFailureProcessor),
+                Arrays.asList(onFailureProcessor, failProcessor))),
+                Arrays.asList(onFailureProcessor));
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
+        trackingProcessor.execute(ingestDocument);
+
+        SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
+        SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument);
+
+        assertThat(failProcessor.getInvokedCounter(), equalTo(2));
+        assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2));
+        assertThat(resultList.size(), equalTo(4));
+
+        assertThat(resultList.get(0).getIngestDocument(), nullValue());
+        assertThat(resultList.get(0).getFailure(), equalTo(exception));
+        assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
+
+        Map<String, String> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
+        assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
+        assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
+        assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
+        assertThat(resultList.get(1).getFailure(), nullValue());
+        assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
+
+        assertThat(resultList.get(2).getIngestDocument(), nullValue());
+        assertThat(resultList.get(2).getFailure(), equalTo(exception));
+        assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
+
+        metadata = resultList.get(3).getIngestDocument().getIngestMetadata();
+        assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
+        assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("compound"));
+        assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("CompoundProcessor-fail-success-success-fail"));
+        assertThat(resultList.get(3).getFailure(), nullValue());
+        assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
+    }
+}

+ 52 - 0
qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/10_pipeline_with_mustache_templates.yaml

@@ -274,3 +274,55 @@
         id: 1
   - length: { _source: 2 }
   - match: { _source.values_flat: ["foo_bar", "foo_baz"] }
+
+---
+"Test verbose simulate with error context":
+  - do:
+      cluster.health:
+          wait_for_status: green
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline" : {
+              "description": "_description",
+              "processors": [
+                {
+                  "rename" : {
+                    "tag" : "rename-status",
+                    "field" : "status",
+                    "to" : "bar",
+                    "on_failure" : [
+                      {
+                        "set" : {
+                          "tag" : "set_on_rename_failure",
+                          "field" : "error",
+                          "value" : "processor {{ _ingest.on_failure_processor_tag }} [{{ _ingest.on_failure_processor_type }}]: {{ _ingest.on_failure_message }}"
+                        }
+                      }
+                    ]
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_type": "type",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+  - length: { docs.0.processor_results: 2 }
+  - match: { docs.0.processor_results.0.tag: "rename-status" }
+  - match: { docs.0.processor_results.0.error.type: "illegal_argument_exception" }
+  - match: { docs.0.processor_results.0.error.reason: "field [status] doesn't exist" }
+  - match: { docs.0.processor_results.1.tag: "set_on_rename_failure" }
+  - length: { docs.0.processor_results.1.doc._source: 2 }
+  - match: { docs.0.processor_results.1.doc._source.foo: "bar"  }
+  - match: { docs.0.processor_results.1.doc._source.error: "processor rename-status [rename]: field [status] doesn't exist" }