Browse Source

Fix ingest simulate verbose on failure with conditional (#56478)

If a conditional is added to a processor, and that processor fails, and 
that processor has an on_failure handler, the full trace of all of the 
executed processors may not be displayed in simulate verbose. The 
information is correct, but misses displaying some of the steps used 
to get there.

This happens because a processor that is conditional processor is a 
wrapper around the real processor and a processor with an on_failure 
handler is also a wrapper around the processor(s). When decorating for 
simulation we treat compound processor specially, but if a compound processor
is wrapped by a conditional processor that compound processor's processors 
can be missed for decoration resulting in the missing displayed steps.

The fix to this is to treat the conditional processor specially and
explicitly seperate it from the processor it is wrapping. This requires
us to keep track of 2 processors a possible conditional processor and
the actual processor it may be wrapping.

related: #56004
Jake Landis 5 years ago
parent
commit
db2c82c5a5

+ 70 - 0
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml

@@ -742,3 +742,73 @@ teardown:
 - match: { docs.0.processor_results.2.doc._source.pipeline1: true }
 - match: { docs.0.processor_results.2.doc._source.pipeline2: true }
 
+---
+"Test verbose simulate with true conditional and on failure":
+- do:
+    ingest.simulate:
+      verbose: true
+      body: >
+        {
+          "pipeline": {
+            "processors": [
+              {
+                "rename": {
+                  "tag": "gunna_fail",
+                  "if": "true",
+                  "field": "foo1",
+                  "target_field": "fieldA",
+                  "on_failure": [
+                    {
+                      "set": {
+                        "field": "failed1",
+                        "value": "failed1",
+                        "tag": "failed1"
+                      }
+                    },
+                    {
+                      "rename": {
+                        "tag": "gunna_fail_again",
+                        "if": "true",
+                        "field": "foo2",
+                        "target_field": "fieldA",
+                        "on_failure": [
+                          {
+                            "set": {
+                              "field": "failed2",
+                              "value": "failed2",
+                              "tag": "failed2"
+                            }
+                          }
+                        ]
+                      }
+                    }
+                  ]
+                }
+              }
+            ]
+          },
+          "docs": [
+            {
+              "_index": "index",
+              "_id": "id",
+              "_source": {
+                "foo": "bar"
+              }
+            }
+          ]
+        }
+- length: { docs: 1 }
+- length: { docs.0.processor_results: 4 }
+- match: { docs.0.processor_results.0.tag: "gunna_fail" }
+- match: { docs.0.processor_results.0.error.reason: "field [foo1] doesn't exist" }
+- match: { docs.0.processor_results.1.tag: "failed1" }
+- match: { docs.0.processor_results.1.doc._source.failed1: "failed1" }
+- match: { docs.0.processor_results.1.doc._ingest.on_failure_processor_tag: "gunna_fail" }
+- match: { docs.0.processor_results.2.tag: "gunna_fail_again" }
+- match: { docs.0.processor_results.2.error.reason: "field [foo2] doesn't exist" }
+- match: { docs.0.processor_results.3.tag: "failed2" }
+- match: { docs.0.processor_results.3.doc._source.failed1: "failed1" }
+- match: { docs.0.processor_results.3.doc._source.failed2: "failed2" }
+- match: { docs.0.processor_results.3.doc._ingest.on_failure_processor_tag: "gunna_fail_again" }
+
+

+ 1 - 1
server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

@@ -47,7 +47,7 @@ class SimulateExecutionService {
                          BiConsumer<SimulateDocumentResult, Exception> handler) {
         if (verbose) {
             List<SimulateProcessorResult> processorResultList = new CopyOnWriteArrayList<>();
-            CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
+            CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
             Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
                 verbosePipelineProcessor);
             ingestDocument.executePipeline(verbosePipeline, (result, e) -> {

+ 36 - 29
server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

@@ -32,17 +32,27 @@ import java.util.function.BiConsumer;
 public final class TrackingResultProcessor implements Processor {
 
     private final Processor actualProcessor;
+    private final ConditionalProcessor conditionalProcessor;
     private final List<SimulateProcessorResult> processorResultList;
     private final boolean ignoreFailure;
 
-    TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
+    TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, ConditionalProcessor conditionalProcessor,
+                            List<SimulateProcessorResult> processorResultList) {
         this.ignoreFailure = ignoreFailure;
         this.processorResultList = processorResultList;
         this.actualProcessor = actualProcessor;
+        this.conditionalProcessor = conditionalProcessor;
     }
 
     @Override
     public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
+        if (conditionalProcessor != null ) {
+            if (conditionalProcessor.evaluate(ingestDocument) == false) {
+                handler.accept(ingestDocument, null);
+                return;
+            }
+        }
+
         if (actualProcessor instanceof PipelineProcessor) {
             PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor);
             Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument);
@@ -64,7 +74,7 @@ public final class TrackingResultProcessor implements Processor {
                     }
                 } else {
                     //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
-                    CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
+                    CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
                     Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
                         verbosePipelineProcessor);
                     ingestDocument.executePipeline(verbosePipeline, handler);
@@ -73,36 +83,20 @@ public final class TrackingResultProcessor implements Processor {
             return;
         }
 
-        final Processor processor;
-        if (actualProcessor instanceof ConditionalProcessor) {
-            ConditionalProcessor conditionalProcessor = (ConditionalProcessor) actualProcessor;
-            if (conditionalProcessor.evaluate(ingestDocument) == false) {
-                handler.accept(ingestDocument, null);
-                return;
-            }
-            if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) {
-                processor = conditionalProcessor.getInnerProcessor();
-            } else {
-                processor = actualProcessor;
-            }
-        } else {
-            processor = actualProcessor;
-        }
-
-        processor.execute(ingestDocument, (result, e) -> {
+        actualProcessor.execute(ingestDocument, (result, e) -> {
             if (e != null) {
                 if (ignoreFailure) {
-                    processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e));
+                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e));
                 } else {
-                    processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
+                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e));
                 }
                 handler.accept(null, e);
             } else {
                 if (result != null) {
-                    processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
+                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
                     handler.accept(result, null);
                 } else {
-                    processorResultList.add(new SimulateProcessorResult(processor.getTag()));
+                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag()));
                     handler.accept(null, null);
                 }
             }
@@ -124,21 +118,34 @@ public final class TrackingResultProcessor implements Processor {
         return actualProcessor.getTag();
     }
 
-    public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
-        List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
+    public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, ConditionalProcessor parentCondition,
+                                             List<SimulateProcessorResult> processorResultList) {
+        List<Processor> processors = new ArrayList<>();
         for (Processor processor : compoundProcessor.getProcessors()) {
+            ConditionalProcessor conditionalProcessor = parentCondition;
+            if (processor instanceof ConditionalProcessor) {
+                conditionalProcessor = (ConditionalProcessor) processor;
+                processor = conditionalProcessor.getInnerProcessor();
+            }
             if (processor instanceof CompoundProcessor) {
-                processors.add(decorate((CompoundProcessor) processor, processorResultList));
+                processors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList));
             } else {
-                processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
+                processors.add(
+                    new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList));
             }
         }
         List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
         for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
+            ConditionalProcessor conditionalProcessor = null;
+            if (processor instanceof ConditionalProcessor) {
+                conditionalProcessor = (ConditionalProcessor) processor;
+                processor = conditionalProcessor.getInnerProcessor();
+            }
             if (processor instanceof CompoundProcessor) {
-                onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
+                onFailureProcessors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList));
             } else {
-                onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
+                onFailureProcessors.add(
+                    new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList));
             }
         }
         return new CompoundProcessor(compoundProcessor.isIgnoreFailure(), processors, onFailureProcessors);

+ 54 - 11
server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java

@@ -64,7 +64,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
     public void testActualProcessor() throws Exception {
         TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
-        TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList);
+        TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList);
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
         SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@@ -81,7 +81,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         RuntimeException exception = new RuntimeException("processor failed");
         TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
         CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor);
-        CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
 
         Exception[] holder = new Exception[1];
         trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
@@ -104,7 +104,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
                 Arrays.asList(failProcessor, onFailureProcessor),
                 Arrays.asList(onFailureProcessor, failProcessor))),
             Arrays.asList(onFailureProcessor));
-        CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
         SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
@@ -137,12 +137,53 @@ public class TrackingResultProcessorTests extends ESTestCase {
         assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
     }
 
+    public void testActualCompoundProcessorWithOnFailureAndTrueCondition() throws Exception {
+        String scriptName = "conditionalScript";
+        ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG,
+            new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> true), Collections.emptyMap())),
+            new HashMap<>(ScriptModule.CORE_CONTEXTS)
+        );
+        RuntimeException exception = new RuntimeException("fail");
+        TestProcessor failProcessor = new TestProcessor("fail", "test", exception);
+        ConditionalProcessor conditionalProcessor = new ConditionalProcessor(
+            randomAlphaOfLength(10),
+            new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService,
+            failProcessor);
+        TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
+        CompoundProcessor actualProcessor =
+            new CompoundProcessor(false,
+                Arrays.asList(conditionalProcessor),
+                Arrays.asList(onFailureProcessor));
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
+        trackingProcessor.execute(ingestDocument, (result, e) -> {
+        });
+
+        SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
+        SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument);
+
+        assertThat(failProcessor.getInvokedCounter(), equalTo(1));
+        assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
+        assertThat(resultList.size(), equalTo(2));
+
+        assertThat(resultList.get(0).getIngestDocument(), nullValue());
+        assertThat(resultList.get(0).getFailure(), equalTo(exception));
+        assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
+
+        Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
+        assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
+        assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
+        assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
+        assertThat(resultList.get(1).getFailure(), nullValue());
+        assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
+    }
+
+
     public void testActualCompoundProcessorWithIgnoreFailure() throws Exception {
         RuntimeException exception = new RuntimeException("processor failed");
         TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
         CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor),
             Collections.emptyList());
-        CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
@@ -173,7 +214,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
                 new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); })),
             new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); }));
 
-        CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList);
+        CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList);
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
         SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument);
 
@@ -215,7 +256,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
         CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
 
-        CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
@@ -282,7 +323,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
         CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
 
-        CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
@@ -351,7 +392,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
         CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
 
-        CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
@@ -404,7 +445,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
         CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
 
-        CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
@@ -455,7 +496,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
         CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
 
-        CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
 
         Exception[] holder = new Exception[1];
         trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
@@ -481,7 +522,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
         CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor);
 
-        CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
+        CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
@@ -504,4 +545,6 @@ public class TrackingResultProcessorTests extends ESTestCase {
             resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1));
     }
 
+
+
 }