Browse Source

Enhance the ingest node simulate verbose output (#60433)

This commit enhances the verbose output for the
`_ingest/pipeline/_simulate?verbose` api. Specifically
this adds the following:

* the pipeline processor is now included in the output
* the conditional (if) and result is now included in the output iff it was defined
* a status field is always displayed. the possible values of status are
  * `success` - if the processor ran with out errors
  * `error` - if the processor ran but threw an error that was not ingored
  * `error_ignored` - if the processor ran but threw an error that was ingored
  * `skipped` - if the process did not run (currently only possible if the if condition evaluates to false)
  * `dropped` - if the the `drop` processor ran and dropped the document
* a `processor_type` field for the type of processor (e.g. set, rename, etc.)
* throw a better error if trying to simulate with a pipeline that does not exist

closes #56004
Jake Landis 5 years ago
parent
commit
35fc997943

+ 76 - 68
docs/reference/ingest/apis/simulate-pipeline.asciidoc

@@ -338,80 +338,88 @@ The API returns the following response:
 [source,console-result]
 ----
 {
-   "docs": [
-      {
-         "processor_results": [
-            {
-               "doc": {
-                  "_id": "id",
-                  "_index": "index",
-                  "_source": {
-                     "field2": "_value2",
-                     "foo": "bar"
-                  },
-                  "_ingest": {
-                     "timestamp": "2017-05-04T22:46:09.674Z",
-                     "pipeline": "_simulate_pipeline"
-                  }
-               }
+  "docs" : [
+    {
+      "processor_results" : [
+        {
+          "processor_type" : "set",
+          "status" : "success",
+          "doc" : {
+            "_index" : "index",
+            "_id" : "id",
+            "_source" : {
+              "field2" : "_value2",
+              "foo" : "bar"
             },
-            {
-               "doc": {
-                  "_id": "id",
-                  "_index": "index",
-                  "_source": {
-                     "field3": "_value3",
-                     "field2": "_value2",
-                     "foo": "bar"
-                  },
-                  "_ingest": {
-                     "timestamp": "2017-05-04T22:46:09.675Z",
-                     "pipeline": "_simulate_pipeline"
-                  }
-               }
+            "_ingest" : {
+              "pipeline" : "_simulate_pipeline",
+              "timestamp" : "2020-07-30T01:21:24.251836Z"
             }
-         ]
-      },
-      {
-         "processor_results": [
-            {
-               "doc": {
-                  "_id": "id",
-                  "_index": "index",
-                  "_source": {
-                     "field2": "_value2",
-                     "foo": "rab"
-                  },
-                  "_ingest": {
-                     "timestamp": "2017-05-04T22:46:09.676Z",
-                     "pipeline": "_simulate_pipeline"
-                  }
-               }
+          }
+        },
+        {
+          "processor_type" : "set",
+          "status" : "success",
+          "doc" : {
+            "_index" : "index",
+            "_id" : "id",
+            "_source" : {
+              "field3" : "_value3",
+              "field2" : "_value2",
+              "foo" : "bar"
             },
-            {
-               "doc": {
-                  "_id": "id",
-                  "_index": "index",
-                  "_source": {
-                     "field3": "_value3",
-                     "field2": "_value2",
-                     "foo": "rab"
-                  },
-                  "_ingest": {
-                     "timestamp": "2017-05-04T22:46:09.677Z",
-                     "pipeline": "_simulate_pipeline"
-                  }
-               }
+            "_ingest" : {
+              "pipeline" : "_simulate_pipeline",
+              "timestamp" : "2020-07-30T01:21:24.251836Z"
             }
-         ]
-      }
-   ]
+          }
+        }
+      ]
+    },
+    {
+      "processor_results" : [
+        {
+          "processor_type" : "set",
+          "status" : "success",
+          "doc" : {
+            "_index" : "index",
+            "_id" : "id",
+            "_source" : {
+              "field2" : "_value2",
+              "foo" : "rab"
+            },
+            "_ingest" : {
+              "pipeline" : "_simulate_pipeline",
+              "timestamp" : "2020-07-30T01:21:24.251863Z"
+            }
+          }
+        },
+        {
+          "processor_type" : "set",
+          "status" : "success",
+          "doc" : {
+            "_index" : "index",
+            "_id" : "id",
+            "_source" : {
+              "field3" : "_value3",
+              "field2" : "_value2",
+              "foo" : "rab"
+            },
+            "_ingest" : {
+              "pipeline" : "_simulate_pipeline",
+              "timestamp" : "2020-07-30T01:21:24.251863Z"
+            }
+          }
+        }
+      ]
+    }
+  ]
 }
 ----
-// TESTRESPONSE[s/"2017-05-04T22:46:09.674Z"/$body.docs.0.processor_results.0.doc._ingest.timestamp/]
-// TESTRESPONSE[s/"2017-05-04T22:46:09.675Z"/$body.docs.0.processor_results.1.doc._ingest.timestamp/]
-// TESTRESPONSE[s/"2017-05-04T22:46:09.676Z"/$body.docs.1.processor_results.0.doc._ingest.timestamp/]
-// TESTRESPONSE[s/"2017-05-04T22:46:09.677Z"/$body.docs.1.processor_results.1.doc._ingest.timestamp/]
+// TESTRESPONSE[s/"2020-07-30T01:21:24.251836Z"/$body.docs.0.processor_results.0.doc._ingest.timestamp/]
+// TESTRESPONSE[s/"2020-07-30T01:21:24.251836Z"/$body.docs.0.processor_results.1.doc._ingest.timestamp/]
+// TESTRESPONSE[s/"2020-07-30T01:21:24.251863Z"/$body.docs.1.processor_results.0.doc._ingest.timestamp/]
+// TESTRESPONSE[s/"2020-07-30T01:21:24.251863Z"/$body.docs.1.processor_results.1.doc._ingest.timestamp/]
 
 ////
 [source,console]

+ 82 - 9
modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml

@@ -546,11 +546,15 @@ teardown:
   - match: { docs.0.processor_results.0.tag: "setstatus-1" }
   - match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
   - match: { docs.0.processor_results.0.doc._source.status: 200 }
+  - match: { docs.0.processor_results.0.status: "success" }
+  - match: { docs.0.processor_results.0.processor_type: "set" }
   - match: { docs.0.processor_results.1.tag: "rename-1" }
   - match: { docs.0.processor_results.1.ignored_error.error.type: "illegal_argument_exception" }
   - match: { docs.0.processor_results.1.ignored_error.error.reason: "field [foofield] doesn't exist" }
   - match: { docs.0.processor_results.1.doc._source.field1: "123.42 400 <foo>" }
   - match: { docs.0.processor_results.1.doc._source.status: 200 }
+  - match: { docs.0.processor_results.1.status: "error_ignored" }
+  - match: { docs.0.processor_results.1.processor_type: "rename" }
 
 ---
 "Test verbose simulate with ignore_failure and no exception thrown":
@@ -591,11 +595,16 @@ teardown:
           }
   - length: { docs: 1 }
   - length: { docs.0.processor_results: 2 }
+  - length: { docs.0.processor_results.0: 4 }
   - match: { docs.0.processor_results.0.tag: "setstatus-1" }
+  - match: { docs.0.processor_results.0.status: "success" }
+  - match: { docs.0.processor_results.0.processor_type: "set" }
   - match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
   - match: { docs.0.processor_results.0.doc._source.status: 200 }
-  - length: { docs.0.processor_results.1: 2 }
+  - length: { docs.0.processor_results.1: 4 }
   - match: { docs.0.processor_results.1.tag: "rename-1" }
+  - match: { docs.0.processor_results.1.status: "success" }
+  - match: { docs.0.processor_results.1.processor_type: "rename" }
   - match: { docs.0.processor_results.1.doc._source.new_status: 200 }
 
 ---
@@ -710,7 +719,8 @@ teardown:
             {
               "set": {
                 "field": "pipeline0",
-                "value": true
+                "value": true,
+                "description" : "first_set"
               }
             },
             {
@@ -731,16 +741,25 @@ teardown:
           ]
         }
 - length: { docs: 1 }
-- length: { docs.0.processor_results: 3 }
+- length: { docs.0.processor_results: 5 }
 - match: { docs.0.processor_results.0.doc._source.pipeline0: true }
+- match: { docs.0.processor_results.0.status: "success" }
+- match: { docs.0.processor_results.0.processor_type: "set" }
+- match: { docs.0.processor_results.0.description: "first_set" }
 - is_false: docs.0.processor_results.0.doc._source.pipeline1
 - is_false: docs.0.processor_results.0.doc._source.pipeline2
-- match: { docs.0.processor_results.1.doc._source.pipeline0: true }
-- match: { docs.0.processor_results.1.doc._source.pipeline1: true }
-- is_false: docs.0.processor_results.1.doc._source.pipeline2
+- match: { docs.0.processor_results.1.doc: null }
+- match: { docs.0.processor_results.1.status: "success" }
+- match: { docs.0.processor_results.1.processor_type: "pipeline" }
 - match: { docs.0.processor_results.2.doc._source.pipeline0: true }
 - match: { docs.0.processor_results.2.doc._source.pipeline1: true }
-- match: { docs.0.processor_results.2.doc._source.pipeline2: true }
+- is_false: docs.0.processor_results.2.doc._source.pipeline2
+- match: { docs.0.processor_results.3.doc: null }
+- match: { docs.0.processor_results.3.status: "success" }
+- match: { docs.0.processor_results.3.processor_type: "pipeline" }
+- match: { docs.0.processor_results.4.doc._source.pipeline0: true }
+- match: { docs.0.processor_results.4.doc._source.pipeline1: true }
+- match: { docs.0.processor_results.4.doc._source.pipeline2: true }
 
 ---
 "Test verbose simulate with true conditional and on failure":
@@ -801,19 +820,27 @@ teardown:
 - length: { docs.0.processor_results: 4 }
 - match: { docs.0.processor_results.0.tag: "gunna_fail" }
 - match: { docs.0.processor_results.0.error.reason: "field [foo1] doesn't exist" }
+- match: { docs.0.processor_results.0.status: "error" }
+- match: { docs.0.processor_results.0.processor_type: "rename" }
 - match: { docs.0.processor_results.1.tag: "failed1" }
 - match: { docs.0.processor_results.1.doc._source.failed1: "failed1" }
 - match: { docs.0.processor_results.1.doc._ingest.on_failure_processor_tag: "gunna_fail" }
+- match: { docs.0.processor_results.1.status: "success" }
+- match: { docs.0.processor_results.1.processor_type: "set" }
 - match: { docs.0.processor_results.2.tag: "gunna_fail_again" }
 - match: { docs.0.processor_results.2.error.reason: "field [foo2] doesn't exist" }
+- match: { docs.0.processor_results.2.status: "error" }
+- match: { docs.0.processor_results.2.processor_type: "rename" }
 - match: { docs.0.processor_results.3.tag: "failed2" }
 - match: { docs.0.processor_results.3.doc._source.failed1: "failed1" }
 - match: { docs.0.processor_results.3.doc._source.failed2: "failed2" }
 - match: { docs.0.processor_results.3.doc._ingest.on_failure_processor_tag: "gunna_fail_again" }
+- match: { docs.0.processor_results.3.status: "success" }
+- match: { docs.0.processor_results.3.processor_type: "set" }
 
 
 ---
-"Test simulate with provided pipeline definition with tag and description in processors":
+"Test simulate with pipeline with conditional and skipped and dropped":
   - do:
       ingest.simulate:
         verbose: true
@@ -829,6 +856,16 @@ teardown:
                     "field" : "field2",
                     "value" : "_value"
                   }
+                },
+                {
+                  "drop" : {
+                    "if": "false"
+                  }
+                },
+                {
+                  "drop" : {
+                     "if": "true"
+                   }
                 }
               ]
             },
@@ -843,7 +880,43 @@ teardown:
             ]
           }
   - length: { docs: 1 }
-  - length: { docs.0.processor_results: 1 }
+  - length: { docs.0.processor_results: 3 }
   - match: { docs.0.processor_results.0.doc._source.field2: "_value" }
   - match: { docs.0.processor_results.0.description: "processor_description" }
   - match: { docs.0.processor_results.0.tag: "processor_tag" }
+  - match: { docs.0.processor_results.0.status: "success" }
+  - match: { docs.0.processor_results.0.processor_type: "set" }
+  - match: { docs.0.processor_results.1.status: "skipped" }
+  - match: { docs.0.processor_results.1.processor_type: "drop" }
+  - match: { docs.0.processor_results.1.if.condition: "false" }
+  - match: { docs.0.processor_results.1.if.result: false }
+  - match: { docs.0.processor_results.2.status: "dropped" }
+  - match: { docs.0.processor_results.2.processor_type: "drop" }
+  - match: { docs.0.processor_results.2.if.condition: "true" }
+  - match: { docs.0.processor_results.2.if.result: true }
+---
+"Test simulate with provided pipeline that does not exist":
+  - do:
+      catch: bad_request
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "pipeline": {
+                     "name": "____pipeline_doesnot_exist___"
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_source": {}
+              }
+            ]
+          }
+  - match: { error.root_cause.0.type: "illegal_argument_exception" }
+  - match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" }

+ 144 - 22
server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java

@@ -21,6 +21,7 @@ package org.elasticsearch.action.ingest;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -32,6 +33,7 @@ import org.elasticsearch.ingest.ConfigurationUtils;
 import org.elasticsearch.ingest.IngestDocument;
 
 import java.io.IOException;
+import java.util.Locale;
 
 import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
 import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
@@ -39,10 +41,33 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
 public class SimulateProcessorResult implements Writeable, ToXContentObject {
 
     private static final String IGNORED_ERROR_FIELD = "ignored_error";
+    private static final String STATUS_FIELD = "status";
+    private static final String TYPE_FIELD = "processor_type";
+    private static final String CONDITION_FIELD = "condition";
+    private static final String RESULT_FIELD = "result";
+
+    enum Status {
+        SUCCESS,
+        ERROR,
+        ERROR_IGNORED,
+        SKIPPED,
+        DROPPED;
+
+        @Override
+        public String toString() {
+            return this.name().toLowerCase(Locale.ROOT);
+        }
+
+        public static Status fromString(String string) {
+            return Status.valueOf(string.toUpperCase(Locale.ROOT));
+        }
+    }
+    private final String type;
     private final String processorTag;
     private final String description;
     private final WriteableIngestDocument ingestDocument;
     private final Exception failure;
+    private final Tuple<String, Boolean> conditionalWithResult;
 
     private static final ConstructingObjectParser<ElasticsearchException, Void> IGNORED_ERROR_PARSER =
         new ConstructingObjectParser<>(
@@ -58,26 +83,51 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
         );
     }
 
+    private static final ConstructingObjectParser<Tuple<String, Boolean>, Void> IF_CONDITION_PARSER =
+        new ConstructingObjectParser<>(
+            "if_condition_parser",
+            true,
+            a -> {
+                String condition = a[0] == null ? null : (String) a[0];
+                Boolean result = a[1] == null ? null : (Boolean) a[1];
+                return new Tuple<>(condition, result);
+            }
+        );
+    static {
+        IF_CONDITION_PARSER.declareString(optionalConstructorArg(), new ParseField(CONDITION_FIELD));
+        IF_CONDITION_PARSER.declareBoolean(optionalConstructorArg(), new ParseField(RESULT_FIELD));
+    }
+
+    @SuppressWarnings("unchecked")
     public static final ConstructingObjectParser<SimulateProcessorResult, Void> PARSER =
         new ConstructingObjectParser<>(
             "simulate_processor_result",
             true,
             a -> {
-                String processorTag = a[0] == null ? null : (String)a[0];
-                String description = a[1] == null ? null : (String)a[1];
-                IngestDocument document = a[2] == null ? null : ((WriteableIngestDocument)a[2]).getIngestDocument();
+                String type = (String) a[0];
+                String processorTag = a[1] == null ? null : (String)a[1];
+                String description = a[2] == null ? null : (String)a[2];
+                Tuple<String, Boolean> conditionalWithResult = a[3] == null ? null : (Tuple<String, Boolean>)a[3];
+                IngestDocument document = a[4] == null ? null : ((WriteableIngestDocument)a[4]).getIngestDocument();
                 Exception failure = null;
-                if (a[3] != null) {
-                    failure = (ElasticsearchException)a[3];
-                } else if (a[4] != null) {
-                    failure = (ElasticsearchException)a[4];
+                if (a[5] != null) {
+                    failure = (ElasticsearchException)a[5];
+                } else if (a[6] != null) {
+                    failure = (ElasticsearchException)a[6];
                 }
-                return new SimulateProcessorResult(processorTag, description, document, failure);
+
+                return new SimulateProcessorResult(type, processorTag, description, document, failure, conditionalWithResult);
             }
         );
     static {
+        PARSER.declareString(optionalConstructorArg(), new ParseField(TYPE_FIELD));
         PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.TAG_KEY));
         PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.DESCRIPTION_KEY));
+        PARSER.declareObject(
+            optionalConstructorArg(),
+            IF_CONDITION_PARSER,
+            new ParseField("if")
+        );
         PARSER.declareObject(
             optionalConstructorArg(),
             WriteableIngestDocument.INGEST_DOC_PARSER,
@@ -95,24 +145,28 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
         );
     }
 
-    public SimulateProcessorResult(String processorTag, String description, IngestDocument ingestDocument,
-                                   Exception failure) {
+    public SimulateProcessorResult(String type, String processorTag, String description, IngestDocument ingestDocument,
+                                   Exception failure, Tuple<String, Boolean> conditionalWithResult) {
         this.processorTag = processorTag;
         this.description = description;
         this.ingestDocument = (ingestDocument == null) ? null : new WriteableIngestDocument(ingestDocument);
         this.failure = failure;
+        this.conditionalWithResult = conditionalWithResult;
+        this.type = type;
     }
 
-    public SimulateProcessorResult(String processorTag, String description, IngestDocument ingestDocument) {
-        this(processorTag, description, ingestDocument, null);
+    public SimulateProcessorResult(String type, String processorTag, String description, IngestDocument ingestDocument,
+                                   Tuple<String, Boolean> conditionalWithResult) {
+        this(type, processorTag, description, ingestDocument, null, conditionalWithResult);
     }
 
-    public SimulateProcessorResult(String processorTag, String description, Exception failure) {
-        this(processorTag, description, null, failure);
+    public SimulateProcessorResult(String type, String processorTag, String description, Exception failure,
+                                   Tuple<String, Boolean> conditionalWithResult ) {
+        this(type, processorTag, description, null, failure, conditionalWithResult);
     }
 
-    public SimulateProcessorResult(String processorTag, String description) {
-        this(processorTag, description, null, null);
+    public SimulateProcessorResult(String type, String processorTag, String description, Tuple<String, Boolean> conditionalWithResult) {
+        this(type, processorTag, description, null, null, conditionalWithResult);
     }
 
     /**
@@ -127,6 +181,19 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
         } else {
             this.description = null;
         }
+        //TODO: fix the version after backport
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            this.type = in.readString();
+            boolean hasConditional = in.readBoolean();
+            if (hasConditional) {
+                this.conditionalWithResult = new Tuple<>(in.readString(), in.readBoolean());
+            } else{
+                this.conditionalWithResult = null; //no condition exists
+            }
+        } else {
+            this.conditionalWithResult = null;
+            this.type = null;
+        }
     }
 
     @Override
@@ -137,6 +204,15 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
         if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
             out.writeOptionalString(description);
         }
+        //TODO: fix the version after backport
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeString(type);
+            out.writeBoolean(conditionalWithResult != null);
+            if (conditionalWithResult != null) {
+                out.writeString(conditionalWithResult.v1());
+                out.writeBoolean(conditionalWithResult.v2());
+            }
+        }
     }
 
     public IngestDocument getIngestDocument() {
@@ -158,21 +234,37 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
         return description;
     }
 
+    public Tuple<String, Boolean> getConditionalWithResult() {
+        return conditionalWithResult;
+    }
+
+    public String getType() {
+        return type;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-        if (processorTag == null && failure == null && ingestDocument == null) {
-            builder.nullValue();
-            return builder;
+        builder.startObject();
+
+        if(type != null){
+            builder.field(TYPE_FIELD, type);
         }
 
-        builder.startObject();
+        builder.field(STATUS_FIELD, getStatus(type));
+
+        if (description != null) {
+            builder.field(ConfigurationUtils.DESCRIPTION_KEY, description);
+        }
 
         if (processorTag != null) {
             builder.field(ConfigurationUtils.TAG_KEY, processorTag);
         }
 
-        if (description != null) {
-            builder.field(ConfigurationUtils.DESCRIPTION_KEY, description);
+        if(conditionalWithResult != null){
+            builder.startObject("if");
+            builder.field(CONDITION_FIELD, conditionalWithResult.v1());
+            builder.field(RESULT_FIELD, conditionalWithResult.v2());
+            builder.endObject();
         }
 
         if (failure != null && ingestDocument != null) {
@@ -194,4 +286,34 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
     public static SimulateProcessorResult fromXContent(XContentParser parser) {
         return PARSER.apply(parser, null);
     }
+
+    Status getStatus(String type) {
+        //if no condition, or condition passed
+        if (conditionalWithResult == null || (conditionalWithResult != null && conditionalWithResult.v2())) {
+            if (failure != null) {
+                if (ingestDocument == null) {
+                    return Status.ERROR;
+                } else {
+                    return Status.ERROR_IGNORED;
+                }
+            } else if (ingestDocument == null && "pipeline".equals(type) == false) {
+                return Status.DROPPED;
+            }
+            return Status.SUCCESS;
+        } else { //has condition that failed the check
+            return Status.SKIPPED;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "SimulateProcessorResult{" +
+            "type='" + type + '\'' +
+            ", processorTag='" + processorTag + '\'' +
+            ", description='" + description + '\'' +
+            ", ingestDocument=" + ingestDocument +
+            ", failure=" + failure +
+            ", conditionalWithResult=" + conditionalWithResult +
+            '}';
+    }
 }

+ 4 - 0
server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

@@ -144,6 +144,10 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
         return TYPE;
     }
 
+    public String getCondition(){
+        return condition.getIdOrCode();
+    }
+
     private static Object wrapUnmodifiable(Object raw) {
         // Wraps all mutable types that the JSON parser can create by immutable wrappers.
         // Any inputs not wrapped are assumed to be immutable

+ 5 - 2
server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java

@@ -55,8 +55,11 @@ public class PipelineProcessor extends AbstractProcessor {
     }
 
     Pipeline getPipeline(IngestDocument ingestDocument) {
-        String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate);
-        return ingestService.getPipeline(pipelineName);
+        return ingestService.getPipeline(getPipelineToCallName(ingestDocument));
+    }
+
+    String getPipelineToCallName(IngestDocument ingestDocument){
+        return ingestDocument.renderTemplate(this.pipelineTemplate);
     }
 
     @Override

+ 31 - 14
server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

@@ -21,6 +21,7 @@ package org.elasticsearch.ingest;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ingest.SimulateProcessorResult;
+import org.elasticsearch.common.collect.Tuple;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -46,11 +47,19 @@ public final class TrackingResultProcessor implements Processor {
 
     @Override
     public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
-        if (conditionalProcessor != null ) {
+        Tuple<String, Boolean> conditionalWithResult;
+        if (conditionalProcessor != null) {
             if (conditionalProcessor.evaluate(ingestDocument) == false) {
+                conditionalWithResult = new Tuple<>(conditionalProcessor.getCondition(), Boolean.FALSE);
+                processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+                    actualProcessor.getDescription(), conditionalWithResult));
                 handler.accept(ingestDocument, null);
                 return;
+            } else {
+                conditionalWithResult = new Tuple<>(conditionalProcessor.getCondition(), Boolean.TRUE);
             }
+        } else {
+            conditionalWithResult = null; //no condition
         }
 
         if (actualProcessor instanceof PipelineProcessor) {
@@ -58,24 +67,32 @@ public final class TrackingResultProcessor implements Processor {
             Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument);
             //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
             IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
-            ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(ingestDocument), (result, e) -> {
+            Pipeline pipelineToCall = pipelineProcessor.getPipeline(ingestDocument);
+            if (pipelineToCall == null) {
+                throw new IllegalArgumentException("Pipeline processor configured for non-existent pipeline [" +
+                    pipelineProcessor.getPipelineToCallName(ingestDocument) + ']');
+            }
+            ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> {
                 // do nothing, let the tracking processors throw the exception while recording the path up to the failure
                 if (e instanceof ElasticsearchException) {
                     ElasticsearchException elasticsearchException = (ElasticsearchException) e;
                     //else do nothing, let the tracking processors throw the exception while recording the path up to the failure
                     if (elasticsearchException.getCause() instanceof IllegalStateException) {
                         if (ignoreFailure) {
-                            processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(),
-                                pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e));
+                            processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(),
+                                pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e, conditionalWithResult));
                         } else {
-                            processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(),
-                                pipelineProcessor.getDescription(), e));
+                            processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(),
+                                pipelineProcessor.getDescription(), e, conditionalWithResult));
                         }
                         handler.accept(null, elasticsearchException);
                     }
                 } else {
                     //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
                     CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
+                    //add the pipeline process to the results
+                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+                        actualProcessor.getDescription(), conditionalWithResult));
                     Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
                         verbosePipelineProcessor);
                     ingestDocument.executePipeline(verbosePipeline, handler);
@@ -87,21 +104,21 @@ public final class TrackingResultProcessor implements Processor {
         actualProcessor.execute(ingestDocument, (result, e) -> {
             if (e != null) {
                 if (ignoreFailure) {
-                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(),
-                        actualProcessor.getDescription(), new IngestDocument(ingestDocument), e));
+                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+                        actualProcessor.getDescription(), new IngestDocument(ingestDocument), e, conditionalWithResult));
                 } else {
-                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(),
-                        actualProcessor.getDescription(), e));
+                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+                        actualProcessor.getDescription(), e, conditionalWithResult));
                 }
                 handler.accept(null, e);
             } else {
                 if (result != null) {
-                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(),
-                        actualProcessor.getDescription(), new IngestDocument(ingestDocument)));
+                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+                        actualProcessor.getDescription(), new IngestDocument(ingestDocument), conditionalWithResult));
                     handler.accept(result, null);
                 } else {
-                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(),
-                        actualProcessor.getDescription()));
+                    processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+                        actualProcessor.getDescription(), conditionalWithResult));
                     handler.accept(null, null);
                 }
             }

+ 2 - 1
server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResultTests.java

@@ -36,9 +36,10 @@ public class SimulateDocumentVerboseResultTests extends AbstractXContentTestCase
         for (int i = 0; i<numDocs; i++) {
             boolean isSuccessful = !(withFailures && randomBoolean());
             boolean isIgnoredError = withFailures && randomBoolean();
+            boolean hasCondition = withFailures && randomBoolean();
             results.add(
                 SimulateProcessorResultTests
-                    .createTestInstance(isSuccessful, isIgnoredError)
+                    .createTestInstance(isSuccessful, isIgnoredError, hasCondition)
             );
         }
         return new SimulateDocumentVerboseResult(results);

+ 51 - 12
server/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.action.ingest;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.xcontent.XContentParser;
@@ -32,8 +33,8 @@ import java.util.StringJoiner;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
-import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
 import static org.elasticsearch.action.ingest.WriteableIngestDocumentTests.createRandomIngestDoc;
+import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
@@ -45,7 +46,8 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
     public void testSerialization() throws IOException {
         boolean isSuccessful = randomBoolean();
         boolean isIgnoredException = randomBoolean();
-        SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException);
+        boolean hasCondition = randomBoolean();
+        SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException, hasCondition);
 
         BytesStreamOutput out = new BytesStreamOutput();
         simulateProcessorResult.writeTo(out);
@@ -73,8 +75,9 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
     public void testBWCDescription() throws IOException {
         boolean isSuccessful = randomBoolean();
         boolean isIgnoredException = randomBoolean();
-        SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException);
-        
+        boolean hasCondition = randomBoolean();
+        SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException, hasCondition);
+
         BytesStreamOutput out = new BytesStreamOutput();
         out.setVersion(VersionUtils.getPreviousVersion(Version.V_7_9_0));
         simulateProcessorResult.writeTo(out);
@@ -85,21 +88,23 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
     }
 
     static SimulateProcessorResult createTestInstance(boolean isSuccessful,
-                                                                boolean isIgnoredException) {
+                                                                boolean isIgnoredException, boolean hasCondition) {
+        String type = randomAlphaOfLengthBetween(1, 10);
         String processorTag = randomAlphaOfLengthBetween(1, 10);
         String description = randomAlphaOfLengthBetween(1, 10);
+        Tuple<String, Boolean> conditionWithResult = hasCondition ? new Tuple<>(randomAlphaOfLengthBetween(1, 10), randomBoolean()) : null;
         SimulateProcessorResult simulateProcessorResult;
         if (isSuccessful) {
             IngestDocument ingestDocument = createRandomIngestDoc();
             if (isIgnoredException) {
-                simulateProcessorResult = new SimulateProcessorResult(processorTag, description, ingestDocument,
-                    new IllegalArgumentException("test"));
+                simulateProcessorResult = new SimulateProcessorResult(type, processorTag, description, ingestDocument,
+                    new IllegalArgumentException("test"), conditionWithResult);
             } else {
-                simulateProcessorResult = new SimulateProcessorResult(processorTag, description, ingestDocument);
+                simulateProcessorResult = new SimulateProcessorResult(type, processorTag, description, ingestDocument, conditionWithResult);
             }
         } else {
-            simulateProcessorResult = new SimulateProcessorResult(processorTag, description,
-                new IllegalArgumentException("test"));
+            simulateProcessorResult = new SimulateProcessorResult(type, processorTag, description,
+                new IllegalArgumentException("test"), conditionWithResult);
         }
         return simulateProcessorResult;
     }
@@ -107,13 +112,14 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
     private static SimulateProcessorResult createTestInstanceWithFailures() {
         boolean isSuccessful = randomBoolean();
         boolean isIgnoredException = randomBoolean();
-        return createTestInstance(isSuccessful, isIgnoredException);
+        boolean hasCondition = randomBoolean();
+        return createTestInstance(isSuccessful, isIgnoredException, hasCondition);
     }
 
     @Override
     protected SimulateProcessorResult createTestInstance() {
         // we test failures separately since comparing XContent is not possible with failures
-        return createTestInstance(true, false);
+        return createTestInstance(true, false, true);
     }
 
     @Override
@@ -178,4 +184,37 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
             getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
             this::assertEqualInstances, assertToXContentEquivalence, getToXContentParams());
     }
+
+    public void testStatus(){
+        SimulateProcessorResult result;
+        // conditional returned false
+        result = new SimulateProcessorResult(null, null, null, createRandomIngestDoc(), null,
+            new Tuple<>(randomAlphaOfLengthBetween(1, 10), false));
+        assertEquals(SimulateProcessorResult.Status.SKIPPED, result.getStatus("set"));
+
+        // no ingest doc
+        result = new SimulateProcessorResult(null, null, null, null, null, null);
+        assertEquals(SimulateProcessorResult.Status.DROPPED, result.getStatus(null));
+
+        // no ingest doc - as pipeline processor
+        result = new SimulateProcessorResult(null, null, null, null, null, null);
+        assertEquals(SimulateProcessorResult.Status.SUCCESS, result.getStatus("pipeline"));
+
+        // failure
+        result = new SimulateProcessorResult(null, null, null, null, new RuntimeException(""), null);
+        assertEquals(SimulateProcessorResult.Status.ERROR, result.getStatus("rename"));
+
+        // failure, but ignored
+        result = new SimulateProcessorResult(null, null, null, createRandomIngestDoc(), new RuntimeException(""), null);
+        assertEquals(SimulateProcessorResult.Status.ERROR_IGNORED, result.getStatus(""));
+
+        //success - no conditional
+        result = new SimulateProcessorResult(null, null, null, createRandomIngestDoc(), null, null);
+        assertEquals(SimulateProcessorResult.Status.SUCCESS, result.getStatus(null));
+
+        //success - conditional true
+        result = new SimulateProcessorResult(null, null, null, createRandomIngestDoc(), null,
+            new Tuple<>(randomAlphaOfLengthBetween(1, 10), true));
+        assertEquals(SimulateProcessorResult.Status.SUCCESS, result.getStatus(null));
+    }
 }

+ 4 - 1
server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java

@@ -54,6 +54,8 @@ import static org.mockito.Mockito.when;
 
 public class ConditionalProcessorTests extends ESTestCase {
 
+    private static final String scriptName = "conditionalScript";
+
     public void testChecksCondition() throws Exception {
         String conditionalField = "field1";
         String scriptName = "conditionalScript";
@@ -114,6 +116,7 @@ public class ConditionalProcessorTests extends ESTestCase {
         assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue));
         assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo")));
         assertStats(processor, 0, 0, 0);
+        assertEquals(scriptName, processor.getCondition());
 
         ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
         ingestDocument.setFieldValue(conditionalField, falseValue);
@@ -148,7 +151,7 @@ public class ConditionalProcessorTests extends ESTestCase {
     }
 
     public void testTypeDeprecation() throws Exception {
-        String scriptName = "conditionalScript";
+
         ScriptService scriptService = new ScriptService(Settings.builder().build(),
                 Collections.singletonMap(
                         Script.DEFAULT_SCRIPT_LANG,

+ 112 - 84
server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java

@@ -46,6 +46,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.Mockito.verify;
@@ -67,8 +68,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
         TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList);
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
-        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
-            actualProcessor.getDescription(), ingestDocument);
+        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+            actualProcessor.getDescription(), ingestDocument, null);
 
         assertThat(actualProcessor.getInvokedCounter(), equalTo(1));
         assertThat(resultList.size(), equalTo(1));
@@ -88,8 +89,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
         trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
         assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
 
-        SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(),
-            actualProcessor.getDescription(), ingestDocument);
+        SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getType(), testProcessor.getTag(),
+            actualProcessor.getDescription(), ingestDocument, null);
         assertThat(testProcessor.getInvokedCounter(), equalTo(1));
         assertThat(resultList.size(), equalTo(1));
         assertThat(resultList.get(0).getIngestDocument(), nullValue());
@@ -109,10 +110,10 @@ public class TrackingResultProcessorTests extends ESTestCase {
         CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
-        SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(),
-            failProcessor.getDescription(), ingestDocument);
-        SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(),
-            failProcessor.getDescription(), ingestDocument);
+        SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getType(), failProcessor.getTag(),
+            failProcessor.getDescription(), ingestDocument, null);
+        SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getType(),
+            onFailureProcessor.getTag(), failProcessor.getDescription(), ingestDocument, null);
 
         assertThat(failProcessor.getInvokedCounter(), equalTo(2));
         assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2));
@@ -163,18 +164,21 @@ public class TrackingResultProcessorTests extends ESTestCase {
         trackingProcessor.execute(ingestDocument, (result, e) -> {
         });
 
-        SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(),
-            failProcessor.getDescription(), ingestDocument);
-        SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(),
-            onFailureProcessor.getDescription(), ingestDocument);
+        SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getType(), failProcessor.getTag(),
+            failProcessor.getDescription(), ingestDocument, null);
+        SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getType(),
+            onFailureProcessor.getTag(), onFailureProcessor.getDescription(), ingestDocument, null);
 
         assertThat(failProcessor.getInvokedCounter(), equalTo(1));
         assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
+
         assertThat(resultList.size(), equalTo(2));
 
         assertThat(resultList.get(0).getIngestDocument(), nullValue());
         assertThat(resultList.get(0).getFailure(), equalTo(exception));
         assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
+        assertThat(resultList.get(0).getConditionalWithResult().v1(), equalTo(scriptName));
+        assertThat(resultList.get(0).getConditionalWithResult().v2(), is(Boolean.TRUE));
 
         Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
         assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
@@ -194,8 +198,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
-        SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(),
-            testProcessor.getDescription(), ingestDocument);
+        SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getType(), testProcessor.getTag(),
+            testProcessor.getDescription(), ingestDocument, null);
         assertThat(testProcessor.getInvokedCounter(), equalTo(1));
         assertThat(resultList.size(), equalTo(1));
         assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
@@ -225,23 +229,25 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
         CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList);
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
-        SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(),
-            compoundProcessor.getDescription(), ingestDocument);
+        SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getType(), compoundProcessor.getTag(),
+            compoundProcessor.getDescription(), ingestDocument, null);
 
-        //the step for key 2 is never executed due to conditional and thus not part of the result set
-        assertThat(resultList.size(), equalTo(2));
+        assertThat(resultList.size(), equalTo(3));
 
         assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
         assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
         assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
 
-        assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
-        assertFalse(resultList.get(1).getIngestDocument().hasField(key2));
-        assertTrue(resultList.get(1).getIngestDocument().hasField(key3));
+        assertThat(resultList.get(1).getConditionalWithResult().v1(), equalTo(scriptName));
+        assertThat(resultList.get(1).getConditionalWithResult().v2(), is(Boolean.FALSE));
 
-        assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
-        assertThat(resultList.get(1).getFailure(), nullValue());
-        assertThat(resultList.get(1).getProcessorTag(), nullValue());
+        assertTrue(resultList.get(2).getIngestDocument().hasField(key1));
+        assertFalse(resultList.get(2).getIngestDocument().hasField(key2));
+        assertTrue(resultList.get(2).getIngestDocument().hasField(key3));
+
+        assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
+        assertThat(resultList.get(2).getFailure(), nullValue());
+        assertThat(resultList.get(2).getProcessorTag(), nullValue());
     }
 
     public void testActualPipelineProcessor() throws Exception {
@@ -270,24 +276,28 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
-        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
-            actualProcessor.getDescription(), ingestDocument);
+        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+            actualProcessor.getDescription(), ingestDocument, null);
         expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
 
         verify(ingestService,  Mockito.atLeast(1)).getPipeline(pipelineId);
-        assertThat(resultList.size(), equalTo(3));
 
-        assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
-        assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
-        assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
+        assertThat(resultList.size(), equalTo(4));
+
+        assertNull(resultList.get(0).getConditionalWithResult());
+        assertThat(resultList.get(0).getType(), equalTo("pipeline"));
 
         assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
-        assertTrue(resultList.get(1).getIngestDocument().hasField(key2));
+        assertFalse(resultList.get(1).getIngestDocument().hasField(key2));
         assertFalse(resultList.get(1).getIngestDocument().hasField(key3));
 
-        assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
-        assertThat(resultList.get(2).getFailure(), nullValue());
-        assertThat(resultList.get(2).getProcessorTag(), nullValue());
+        assertTrue(resultList.get(2).getIngestDocument().hasField(key1));
+        assertTrue(resultList.get(2).getIngestDocument().hasField(key2));
+        assertFalse(resultList.get(2).getIngestDocument().hasField(key3));
+
+        assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
+        assertThat(resultList.get(3).getFailure(), nullValue());
+        assertThat(resultList.get(3).getProcessorTag(), nullValue());
     }
 
     public void testActualPipelineProcessorWithTrueConditional() throws Exception {
@@ -320,7 +330,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
                 randomAlphaOfLength(10),
                 null,
                 new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService,
-                factory.create(Collections.emptyMap(), null, null, pipelineConfig2)),
+                factory.create(Collections.emptyMap(), "pipeline1", null, pipelineConfig2)),
             new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); })
         )
         );
@@ -332,33 +342,42 @@ public class TrackingResultProcessorTests extends ESTestCase {
         when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1);
         when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2);
 
-        PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, null, pipelineConfig0);
+        PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), "pipeline0", null, pipelineConfig0);
         CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
 
         CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
-
-        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
-            actualProcessor.getDescription(), ingestDocument);
+        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+            actualProcessor.getDescription(), ingestDocument, null);
         expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1);
 
         verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
         verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId2);
-        assertThat(resultList.size(), equalTo(3));
 
-        assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
-        assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
-        assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
+        assertThat(resultList.size(), equalTo(5));
+
+        assertNull(resultList.get(0).getConditionalWithResult());
+        assertThat(resultList.get(0).getType(), equalTo("pipeline"));
+        assertThat(resultList.get(0).getProcessorTag(), equalTo("pipeline0"));
 
         assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
-        assertTrue(resultList.get(1).getIngestDocument().hasField(key2));
+        assertFalse(resultList.get(1).getIngestDocument().hasField(key2));
         assertFalse(resultList.get(1).getIngestDocument().hasField(key3));
 
-        assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
-        assertThat(resultList.get(2).getFailure(), nullValue());
-        assertThat(resultList.get(2).getProcessorTag(), nullValue());
+        assertThat(resultList.get(2).getConditionalWithResult().v1(), equalTo(scriptName));
+        assertThat(resultList.get(2).getConditionalWithResult().v2(), is(Boolean.TRUE));
+        assertThat(resultList.get(2).getType(), equalTo("pipeline"));
+        assertThat(resultList.get(2).getProcessorTag(), equalTo("pipeline1"));
+
+        assertTrue(resultList.get(3).getIngestDocument().hasField(key1));
+        assertTrue(resultList.get(3).getIngestDocument().hasField(key2));
+        assertFalse(resultList.get(3).getIngestDocument().hasField(key3));
+
+        assertThat(resultList.get(4).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
+        assertThat(resultList.get(4).getFailure(), nullValue());
+        assertThat(resultList.get(4).getProcessorTag(), nullValue());
     }
 
     public void testActualPipelineProcessorWithFalseConditional() throws Exception {
@@ -410,26 +429,28 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
-
-        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
-            actualProcessor.getDescription(), ingestDocument);
+        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+            actualProcessor.getDescription(), ingestDocument, null);
         expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1);
 
         verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
         verify(ingestService, Mockito.never()).getPipeline(pipelineId2);
-        assertThat(resultList.size(), equalTo(2));
 
-        assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
-        assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
-        assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
+        assertThat(resultList.size(), equalTo(4));
+
+        assertNull(resultList.get(0).getConditionalWithResult());
+        assertThat(resultList.get(0).getType(), equalTo("pipeline"));
 
         assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
         assertFalse(resultList.get(1).getIngestDocument().hasField(key2));
-        assertTrue(resultList.get(1).getIngestDocument().hasField(key3));
+        assertFalse(resultList.get(1).getIngestDocument().hasField(key3));
 
-        assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
-        assertThat(resultList.get(1).getFailure(), nullValue());
-        assertThat(resultList.get(1).getProcessorTag(), nullValue());
+        assertThat(resultList.get(2).getConditionalWithResult().v1(), equalTo(scriptName));
+        assertThat(resultList.get(2).getConditionalWithResult().v2(), is(Boolean.FALSE));
+
+        assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
+        assertThat(resultList.get(3).getFailure(), nullValue());
+        assertThat(resultList.get(3).getProcessorTag(), nullValue());
     }
 
     public void testActualPipelineProcessorWithHandledFailure() throws Exception {
@@ -464,28 +485,31 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
-        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
-            actualProcessor.getDescription(), ingestDocument);
+        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+            actualProcessor.getDescription(), ingestDocument, null);
         expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
 
         verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId);
-        assertThat(resultList.size(), equalTo(4));
+        assertThat(resultList.size(), equalTo(5));
 
-        assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
-        assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
-        assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
+        assertNull(resultList.get(0).getConditionalWithResult());
+        assertThat(resultList.get(0).getType(), equalTo("pipeline"));
+
+        assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
+        assertFalse(resultList.get(1).getIngestDocument().hasField(key2));
+        assertFalse(resultList.get(1).getIngestDocument().hasField(key3));
 
         //failed processor
-        assertNull(resultList.get(1).getIngestDocument());
-        assertThat(resultList.get(1).getFailure().getMessage(), equalTo(exception.getMessage()));
+        assertNull(resultList.get(2).getIngestDocument());
+        assertThat(resultList.get(2).getFailure().getMessage(), equalTo(exception.getMessage()));
 
-        assertTrue(resultList.get(2).getIngestDocument().hasField(key1));
-        assertTrue(resultList.get(2).getIngestDocument().hasField(key2));
-        assertFalse(resultList.get(2).getIngestDocument().hasField(key3));
+        assertTrue(resultList.get(3).getIngestDocument().hasField(key1));
+        assertTrue(resultList.get(3).getIngestDocument().hasField(key2));
+        assertFalse(resultList.get(3).getIngestDocument().hasField(key3));
 
-        assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
-        assertThat(resultList.get(3).getFailure(), nullValue());
-        assertThat(resultList.get(3).getProcessorTag(), nullValue());
+        assertThat(resultList.get(4).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
+        assertThat(resultList.get(4).getFailure(), nullValue());
+        assertThat(resultList.get(4).getProcessorTag(), nullValue());
     }
 
     public void testActualPipelineProcessorWithCycle() throws Exception {
@@ -536,32 +560,36 @@ public class TrackingResultProcessorTests extends ESTestCase {
         );
         when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
 
+        // calls the same pipeline twice
         CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor);
 
         CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
 
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
-        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
-            actualProcessor.getDescription(), ingestDocument);
+        SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
+            actualProcessor.getDescription(), ingestDocument, null);
         expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
 
         verify(ingestService,  Mockito.atLeast(2)).getPipeline(pipelineId);
-        assertThat(resultList.size(), equalTo(2));
+        assertThat(resultList.size(), equalTo(4));
 
-        assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument())));
-        assertThat(resultList.get(0).getFailure(), nullValue());
-        assertThat(resultList.get(0).getProcessorTag(), nullValue());
+        assertNull(resultList.get(0).getConditionalWithResult());
+        assertThat(resultList.get(0).getType(), equalTo("pipeline"));
 
-        assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
+        assertThat(resultList.get(1).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument())));
         assertThat(resultList.get(1).getFailure(), nullValue());
         assertThat(resultList.get(1).getProcessorTag(), nullValue());
 
-        //each invocation updates key1 with a random int
-        assertNotEquals(resultList.get(0).getIngestDocument().getSourceAndMetadata().get(key1),
-            resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1));
-    }
-
+        assertNull(resultList.get(2).getConditionalWithResult());
+        assertThat(resultList.get(2).getType(), equalTo("pipeline"));
 
+        assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
+        assertThat(resultList.get(3).getFailure(), nullValue());
+        assertThat(resultList.get(3).getProcessorTag(), nullValue());
 
+        //each invocation updates key1 with a random int
+        assertNotEquals(resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1),
+            resultList.get(3).getIngestDocument().getSourceAndMetadata().get(key1));
+    }
 }