Explorar el Código

Forwarding simulate calls to ingest nodes (#92171)

Calls to the simulate pipeline API are now forwarded to an ingest node. If there is no ingest node,
the request now fails.
Keith Massey hace 2 años
padre
commit
64a52ce18f

+ 5 - 0
docs/changelog/92171.yaml

@@ -0,0 +1,5 @@
+pr: 92171
+summary: Forwarding simulate calls to ingest nodes
+area: Ingest Node
+type: bug
+issues: []

+ 36 - 0
modules/ingest-geoip/src/yamlRestTest/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml

@@ -309,3 +309,39 @@
   - match: { _source.geoip.asn: 29518 }
   - match: { _source.geoip.organization_name: "Bredband2 AB" }
   - match: { _source.geoip.network: "89.160.0.0/17" }
+
+---
+"Test simulate with Geoip Processor":
+- do:
+    ingest.put_pipeline:
+      id: "pipeline1"
+      body:  >
+        {
+          "processors": [
+          {
+            "geoip": {
+              "field": "source.ip",
+              "target_field": "source.geo"
+            }
+          }
+          ]
+        }
+- match: { acknowledged: true }
+
+- do:
+    ingest.simulate:
+      id: "pipeline1"
+      body:  >
+        {
+          "docs": [
+            {
+              "_source": {
+                "source": {
+                  "ip": "89.160.20.128"
+                }
+              }
+            }
+          ]
+        }
+- length: { docs: 1 }
+- match: { docs.0.doc._source.source.geo.city_name: "Linköping" }

+ 2 - 6
qa/smoke-test-ingest-disabled/src/yamlRestTest/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yml

@@ -28,7 +28,7 @@
   - match: { acknowledged: true }
 
 ---
-"Test ingest simulate API works fine when node.ingest is set to false":
+"Test ingest simulate API fails when node.ingest is set to false":
   - do:
       ingest.put_pipeline:
         id: "my_pipeline"
@@ -47,6 +47,7 @@
   - match: { acknowledged: true }
 
   - do:
+      catch: /There are no ingest nodes in this cluster, unable to forward request to an ingest node./
       ingest.simulate:
         id: "my_pipeline"
         body: >
@@ -61,11 +62,6 @@
               }
             ]
           }
-  - length: { docs: 1 }
-  - match: { docs.0.doc._source.foo: "bar" }
-  - match: { docs.0.doc._source.field2: "_value" }
-  - length: { docs.0.doc._ingest: 1 }
-  - is_true: docs.0.doc._ingest.timestamp
 
 ---
 "Test index api with pipeline id fails when node.ingest is set to false":

+ 2 - 0
qa/smoke-test-multinode/build.gradle

@@ -22,6 +22,8 @@ File repo = file("$buildDir/testclusters/repo")
 testClusters.matching { it.name == "yamlRestTest" }.configureEach {
   numberOfNodes = 2
   setting 'path.repo', repo.absolutePath
+  // The first node does not have the ingest role so we're sure ingest requests are forwarded:
+  nodes."yamlRestTest-0".setting 'node.roles', '[master,data,ml,remote_cluster_client,transform]'
 }
 
 testClusters.configureEach {

+ 978 - 0
qa/smoke-test-multinode/src/yamlRestTest/resources/rest-api-spec/test/smoke_test_multinode/40_simulate.yml

@@ -0,0 +1,978 @@
+---
+teardown:
+  - do:
+      ingest.delete_pipeline:
+        id: "my_pipeline"
+        ignore: 404
+
+---
+"Test simulate with stored ingest pipeline":
+  - do:
+      ingest.put_pipeline:
+        id: "my_pipeline"
+        body:  >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "set" : {
+                  "field" : "field2",
+                  "value" : "_value"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.simulate:
+        id: "my_pipeline"
+        body: >
+          {
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+  - match: { docs.0.doc._source.foo: "bar" }
+  - match: { docs.0.doc._source.field2: "_value" }
+  - length: { docs.0.doc._ingest: 1 }
+  - is_true: docs.0.doc._ingest.timestamp
+
+---
+"Test simulate with provided pipeline definition":
+  - do:
+      ingest.simulate:
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "set" : {
+                    "field" : "field2",
+                    "value" : "_value"
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+
+---
+"Test simulate with provided invalid pipeline definition":
+  - do:
+      catch: bad_request
+      ingest.simulate:
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "set" : {
+                    "tag" : "fails",
+                    "value" : "_value"
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ]
+          }
+  - match: { error.root_cause.0.type: "parse_exception" }
+  - match: { error.root_cause.0.reason: "[field] required property is missing" }
+  - match: { error.root_cause.0.processor_tag: "fails" }
+  - match: { error.root_cause.0.processor_type: "set" }
+  - match: { error.root_cause.0.property_name: "field" }
+
+---
+"Test simulate without id":
+  - do:
+      ingest.simulate:
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "set" : {
+                    "field" : "field2",
+                    "value" : "_value"
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+
+---
+"Test simulate with provided pipeline definition with on_failure block":
+  - do:
+      ingest.simulate:
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "rename" : {
+                    "field" : "does_not_exist",
+                    "target_field" : "field2",
+                    "on_failure" : [
+                      {
+                        "set" : {
+                          "field" : "field2",
+                          "value" : "_value"
+                        }
+                      }
+                    ]
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+  - match: { docs.0.doc._source.foo: "bar" }
+  - match: { docs.0.doc._source.field2: "_value" }
+  - length: { docs.0.doc._ingest: 1 }
+  - is_true: docs.0.doc._ingest.timestamp
+
+---
+"Test simulate with no provided pipeline or pipeline_id":
+  - do:
+      catch: bad_request
+      ingest.simulate:
+        body: >
+          {
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ]
+          }
+  - is_false: error.root_cause.0.processor_type
+  - is_false: error.root_cause.0.processor_tag
+  - match: { error.root_cause.0.property_name: "pipeline" }
+  - match: { error.reason: "[pipeline] required property is missing" }
+
+---
+"Test simulate with invalid processor config":
+  - do:
+      catch: bad_request
+      ingest.simulate:
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "set" : {
+                    "field" : "field2"
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ]
+          }
+  - match: { error.root_cause.0.type: "parse_exception" }
+  - match: { error.root_cause.0.reason: "[value] required property is missing" }
+  - match: { error.root_cause.0.processor_type: "set" }
+  - match: { error.root_cause.0.property_name: "value" }
+  - is_false: error.root_cause.0.processor_tag
+
+---
+"Test simulate with verbose flag":
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "set" : {
+                    "tag" : "processor[set]-0",
+                    "field" : "field2.value",
+                    "value" : "_value"
+                  }
+                },
+                {
+                  "set" : {
+                    "field" : "field3",
+                    "value" : "third_val"
+                  }
+                },
+                {
+                  "uppercase" : {
+                    "field" : "field2.value"
+                  }
+                },
+                {
+                  "lowercase" : {
+                    "field" : "foo.bar.0.item"
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "foo": {
+                    "bar" : [ {"item": "HELLO"} ]
+                  }
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+  - length: { docs.0.processor_results: 4 }
+  - match: { docs.0.processor_results.0.tag: "processor[set]-0" }
+  - length: { docs.0.processor_results.0.doc._source: 2 }
+  - match: { docs.0.processor_results.0.doc._source.foo.bar.0.item: "HELLO" }
+  - match: { docs.0.processor_results.0.doc._source.field2.value: "_value" }
+  - length: { docs.0.processor_results.0.doc._ingest: 2 }
+  - is_true: docs.0.processor_results.0.doc._ingest.timestamp
+  - is_true: docs.0.processor_results.0.doc._ingest.pipeline
+  - length: { docs.0.processor_results.1.doc._source: 3 }
+  - match: { docs.0.processor_results.1.doc._source.foo.bar.0.item: "HELLO" }
+  - match: { docs.0.processor_results.1.doc._source.field2.value: "_value" }
+  - match: { docs.0.processor_results.1.doc._source.field3: "third_val" }
+  - length: { docs.0.processor_results.1.doc._ingest: 2 }
+  - is_true: docs.0.processor_results.1.doc._ingest.timestamp
+  - is_true: docs.0.processor_results.1.doc._ingest.pipeline
+  - length: { docs.0.processor_results.2.doc._source: 3 }
+  - match: { docs.0.processor_results.2.doc._source.foo.bar.0.item: "HELLO" }
+  - match: { docs.0.processor_results.2.doc._source.field2.value: "_VALUE" }
+  - match: { docs.0.processor_results.2.doc._source.field3: "third_val" }
+  - length: { docs.0.processor_results.2.doc._ingest: 2 }
+  - is_true: docs.0.processor_results.2.doc._ingest.timestamp
+  - is_true: docs.0.processor_results.2.doc._ingest.pipeline
+  - length: { docs.0.processor_results.3.doc._source: 3 }
+  - match: { docs.0.processor_results.3.doc._source.foo.bar.0.item: "hello" }
+  - match: { docs.0.processor_results.3.doc._source.field2.value: "_VALUE" }
+  - match: { docs.0.processor_results.3.doc._source.field3: "third_val" }
+  - length: { docs.0.processor_results.3.doc._ingest: 2 }
+  - is_true: docs.0.processor_results.3.doc._ingest.timestamp
+  - is_true: docs.0.processor_results.3.doc._ingest.pipeline
+
+---
+"Test simulate with exception thrown":
+  - do:
+      ingest.simulate:
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "uppercase" : {
+                    "field" : "foo"
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "not_foo": "bar"
+                }
+              },
+              {
+                "_index": "index",
+                "_id": "id2",
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ]
+          }
+  - length: { docs: 2 }
+  - match: { docs.0.error.type: "illegal_argument_exception" }
+  - match: { docs.1.doc._source.foo: "BAR" }
+  - length: { docs.1.doc._ingest: 1 }
+  - is_true: docs.1.doc._ingest.timestamp
+
+---
+"Test verbose simulate with exception thrown":
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "convert" : {
+                    "field" : "foo",
+                    "type" : "integer"
+                  }
+                },
+                {
+                  "uppercase" : {
+                    "field" : "bar"
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar",
+                  "bar": "hello"
+                }
+              },
+              {
+                "_index": "index",
+                "_id": "id2",
+                "_source": {
+                  "foo": "5",
+                  "bar": "hello"
+                }
+              }
+            ]
+          }
+  - length: { docs: 2 }
+  - length: { docs.0.processor_results: 1 }
+  - match: { docs.0.processor_results.0.error.type: "illegal_argument_exception" }
+  - length: { docs.1.processor_results: 2 }
+  - match: { docs.1.processor_results.0.doc._index: "index" }
+  - match: { docs.1.processor_results.0.doc._source.foo: 5 }
+  - match: { docs.1.processor_results.0.doc._source.bar: "hello" }
+  - length: { docs.1.processor_results.0.doc._ingest: 2 }
+  - is_true: docs.1.processor_results.0.doc._ingest.timestamp
+  - is_true: docs.1.processor_results.0.doc._ingest.pipeline
+  - match: { docs.1.processor_results.1.doc._source.foo: 5 }
+  - match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
+  - length: { docs.1.processor_results.1.doc._ingest: 2 }
+  - is_true: docs.1.processor_results.1.doc._ingest.timestamp
+  - is_true: docs.1.processor_results.1.doc._ingest.pipeline
+
+---
+"Test verbose simulate with error in pipeline":
+  - do:
+      ingest.put_pipeline:
+        id: "my_pipeline"
+        body:  >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "rename" : {
+                  "field" : "does_not_exist",
+                  "target_field" : "_value"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "pipeline" : {
+                    "name" : "my_pipeline"
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar",
+                  "bar": "hello"
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+  - length: { docs.0.processor_results: 2 }
+  - match: { docs.0.processor_results.0.processor_type: "pipeline" }
+  - match: { docs.0.processor_results.0.status: "success" }
+  - match: { docs.0.processor_results.1.processor_type: "rename" }
+  - match: { docs.0.processor_results.1.status: "error" }
+  - match: { docs.0.processor_results.1.error.root_cause.0.type: "illegal_argument_exception" }
+  - match: { docs.0.processor_results.1.error.root_cause.0.reason: "field [does_not_exist] doesn't exist" }
+  - match: { docs.0.processor_results.1.error.type: "illegal_argument_exception" }
+  - match: { docs.0.processor_results.1.error.reason: "field [does_not_exist] doesn't exist" }
+
+---
+"Test verbose simulate with on_failure":
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline" : {
+              "description": "_description",
+              "processors": [
+                {
+                  "set" : {
+                    "tag" : "setstatus-1",
+                    "field" : "status",
+                    "value" : 200
+                  }
+                },
+                {
+                  "rename" : {
+                    "tag" : "rename-1",
+                    "field" : "foofield",
+                    "target_field" : "field1",
+                    "on_failure" : [
+                      {
+                        "set" : {
+                          "tag" : "set on_failure rename",
+                          "field" : "foofield",
+                          "value" : "exists"
+                        }
+                      },
+                      {
+                        "rename" : {
+                          "field" : "foofield2",
+                          "target_field" : "field1",
+                          "on_failure" : [
+                            {
+                              "set" : {
+                                "field" : "foofield2",
+                                "value" : "ran"
+                              }
+                            }
+                          ]
+                        }
+                      }
+                    ]
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "field1": "123.42 400 <foo>"
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+  - length: { docs.0.processor_results: 5 }
+  - match: { docs.0.processor_results.0.tag: "setstatus-1" }
+  - match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
+  - match: { docs.0.processor_results.0.doc._source.status: 200 }
+  - match: { docs.0.processor_results.1.tag: "rename-1" }
+  - match: { docs.0.processor_results.1.error.type: "illegal_argument_exception" }
+  - match: { docs.0.processor_results.1.error.reason: "field [foofield] doesn't exist" }
+  - match: { docs.0.processor_results.2.tag: "set on_failure rename" }
+  - is_false: docs.0.processor_results.3.tag
+  - is_false: docs.0.processor_results.4.tag
+  - match: { docs.0.processor_results.4.doc._source.foofield: "exists" }
+  - match: { docs.0.processor_results.4.doc._source.foofield2: "ran" }
+  - match: { docs.0.processor_results.4.doc._source.field1: "123.42 400 <foo>" }
+  - match: { docs.0.processor_results.4.doc._source.status: 200 }
+
+---
+"Test verbose simulate with ignore_failure and thrown exception":
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline" : {
+              "description": "_description",
+              "processors": [
+                {
+                  "set" : {
+                    "tag" : "setstatus-1",
+                    "field" : "status",
+                    "value" : 200
+                  }
+                },
+                {
+                  "rename" : {
+                    "tag" : "rename-1",
+                    "field" : "foofield",
+                    "target_field" : "field1",
+                    "ignore_failure": true,
+                    "on_failure" : [
+                      {
+                        "set" : {
+                          "tag" : "set on_failure rename",
+                          "field" : "foofield",
+                          "value" : "exists"
+                        }
+                      },
+                      {
+                        "rename" : {
+                          "field" : "foofield2",
+                          "target_field" : "field1",
+                          "on_failure" : [
+                            {
+                              "set" : {
+                                "field" : "foofield2",
+                                "value" : "ran"
+                              }
+                            }
+                          ]
+                        }
+                      }
+                    ]
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "field1": "123.42 400 <foo>"
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+  - length: { docs.0.processor_results: 2 }
+  - match: { docs.0.processor_results.0.tag: "setstatus-1" }
+  - match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
+  - match: { docs.0.processor_results.0.doc._source.status: 200 }
+  - match: { docs.0.processor_results.0.status: "success" }
+  - match: { docs.0.processor_results.0.processor_type: "set" }
+  - match: { docs.0.processor_results.1.tag: "rename-1" }
+  - match: { docs.0.processor_results.1.ignored_error.error.type: "illegal_argument_exception" }
+  - match: { docs.0.processor_results.1.ignored_error.error.reason: "field [foofield] doesn't exist" }
+  - match: { docs.0.processor_results.1.doc._source.field1: "123.42 400 <foo>" }
+  - match: { docs.0.processor_results.1.doc._source.status: 200 }
+  - match: { docs.0.processor_results.1.status: "error_ignored" }
+  - match: { docs.0.processor_results.1.processor_type: "rename" }
+
+---
+"Test verbose simulate with ignore_failure and no exception thrown":
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline" : {
+              "description": "_description",
+              "processors": [
+                {
+                  "set" : {
+                    "tag" : "setstatus-1",
+                    "field" : "status",
+                    "value" : 200
+                  }
+                },
+                {
+                  "rename" : {
+                    "tag" : "rename-1",
+                    "field" : "status",
+                    "target_field" : "new_status",
+                    "ignore_failure": true
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "field1": "123.42 400 <foo>"
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+  - length: { docs.0.processor_results: 2 }
+  - length: { docs.0.processor_results.0: 4 }
+  - match: { docs.0.processor_results.0.tag: "setstatus-1" }
+  - match: { docs.0.processor_results.0.status: "success" }
+  - match: { docs.0.processor_results.0.processor_type: "set" }
+  - match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
+  - match: { docs.0.processor_results.0.doc._source.status: 200 }
+  - length: { docs.0.processor_results.1: 4 }
+  - match: { docs.0.processor_results.1.tag: "rename-1" }
+  - match: { docs.0.processor_results.1.status: "success" }
+  - match: { docs.0.processor_results.1.processor_type: "rename" }
+  - match: { docs.0.processor_results.1.doc._source.new_status: 200 }
+
+---
+"Test verbose simulate with Pipeline Processor with Circular Pipelines":
+  - do:
+      ingest.put_pipeline:
+        id: "outer"
+        body:  >
+          {
+            "description" : "outer pipeline",
+            "processors" : [
+              {
+                "pipeline" : {
+                  "name": "inner"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.put_pipeline:
+        id: "inner"
+        body:  >
+          {
+            "description" : "inner pipeline",
+            "processors" : [
+              {
+                "pipeline" : {
+                  "name": "outer"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline": {
+              "processors" : [
+              {
+                "pipeline" : {
+                  "name": "outer"
+                }
+              }
+              ]
+            }
+            ,
+            "docs": [
+            {
+              "_index": "index",
+              "_id": "id",
+              "_source": {
+                "field1": "123.42 400 <foo>"
+              }
+            }
+            ]
+          }
+  - length: { docs: 1 }
+  - length: { docs.0.processor_results: 1 }
+  - match: { docs.0.processor_results.0.error.reason: "Cycle detected for pipeline: outer" }
+
+---
+"Test verbose simulate with Pipeline Processor with Multiple Pipelines":
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline1"
+        body:  >
+          {
+            "processors": [
+            {
+              "set": {
+                "field": "pipeline1",
+                "value": true
+              }
+            },
+            {
+              "pipeline": {
+                "name": "pipeline2"
+              }
+            }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline2"
+        body:  >
+          {
+            "processors": [
+            {
+              "set": {
+                "field": "pipeline2",
+                "value": true
+              }
+            }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline": {
+              "processors": [
+              {
+                "set": {
+                  "field": "pipeline0",
+                  "value": true,
+                  "description" : "first_set"
+                }
+              },
+              {
+                "pipeline": {
+                  "name": "pipeline1"
+                }
+              }
+              ]
+            },
+            "docs": [
+            {
+              "_index": "index",
+              "_id": "id",
+              "_source": {
+                "field1": "123.42 400 <foo>"
+              }
+            }
+            ]
+          }
+  - length: { docs: 1 }
+  - length: { docs.0.processor_results: 5 }
+  - match: { docs.0.processor_results.0.doc._source.pipeline0: true }
+  - match: { docs.0.processor_results.0.status: "success" }
+  - match: { docs.0.processor_results.0.processor_type: "set" }
+  - match: { docs.0.processor_results.0.description: "first_set" }
+  - is_false: docs.0.processor_results.0.doc._source.pipeline1
+  - is_false: docs.0.processor_results.0.doc._source.pipeline2
+  - match: { docs.0.processor_results.1.doc: null }
+  - match: { docs.0.processor_results.1.status: "success" }
+  - match: { docs.0.processor_results.1.processor_type: "pipeline" }
+  - match: { docs.0.processor_results.2.doc._source.pipeline0: true }
+  - match: { docs.0.processor_results.2.doc._source.pipeline1: true }
+  - is_false: docs.0.processor_results.2.doc._source.pipeline2
+  - match: { docs.0.processor_results.3.doc: null }
+  - match: { docs.0.processor_results.3.status: "success" }
+  - match: { docs.0.processor_results.3.processor_type: "pipeline" }
+  - match: { docs.0.processor_results.4.doc._source.pipeline0: true }
+  - match: { docs.0.processor_results.4.doc._source.pipeline1: true }
+  - match: { docs.0.processor_results.4.doc._source.pipeline2: true }
+
+---
+"Test verbose simulate with true conditional and on failure":
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline": {
+              "processors": [
+                {
+                  "rename": {
+                    "tag": "gunna_fail",
+                    "if": "true",
+                    "field": "foo1",
+                    "target_field": "fieldA",
+                    "on_failure": [
+                      {
+                        "set": {
+                          "field": "failed1",
+                          "value": "failed1",
+                          "tag": "failed1"
+                        }
+                      },
+                      {
+                        "rename": {
+                          "tag": "gunna_fail_again",
+                          "if": "true",
+                          "field": "foo2",
+                          "target_field": "fieldA",
+                          "on_failure": [
+                            {
+                              "set": {
+                                "field": "failed2",
+                                "value": "failed2",
+                                "tag": "failed2"
+                              }
+                            }
+                          ]
+                        }
+                      }
+                    ]
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+  - length: { docs.0.processor_results: 4 }
+  - match: { docs.0.processor_results.0.tag: "gunna_fail" }
+  - match: { docs.0.processor_results.0.error.reason: "field [foo1] doesn't exist" }
+  - match: { docs.0.processor_results.0.status: "error" }
+  - match: { docs.0.processor_results.0.processor_type: "rename" }
+  - match: { docs.0.processor_results.1.tag: "failed1" }
+  - match: { docs.0.processor_results.1.doc._source.failed1: "failed1" }
+  - match: { docs.0.processor_results.1.doc._ingest.on_failure_processor_tag: "gunna_fail" }
+  - match: { docs.0.processor_results.1.status: "success" }
+  - match: { docs.0.processor_results.1.processor_type: "set" }
+  - match: { docs.0.processor_results.2.tag: "gunna_fail_again" }
+  - match: { docs.0.processor_results.2.error.reason: "field [foo2] doesn't exist" }
+  - match: { docs.0.processor_results.2.status: "error" }
+  - match: { docs.0.processor_results.2.processor_type: "rename" }
+  - match: { docs.0.processor_results.3.tag: "failed2" }
+  - match: { docs.0.processor_results.3.doc._source.failed1: "failed1" }
+  - match: { docs.0.processor_results.3.doc._source.failed2: "failed2" }
+  - match: { docs.0.processor_results.3.doc._ingest.on_failure_processor_tag: "gunna_fail_again" }
+  - match: { docs.0.processor_results.3.status: "success" }
+  - match: { docs.0.processor_results.3.processor_type: "set" }
+
+
+---
+"Test simulate with pipeline with conditional and skipped and dropped":
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "set" : {
+                    "description": "processor_description",
+                    "tag": "processor_tag",
+                    "field" : "field2",
+                    "value" : "_value"
+                  }
+                },
+                {
+                  "drop" : {
+                    "if": "false"
+                  }
+                },
+                {
+                  "drop" : {
+                     "if": "true"
+                   }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_index": "index",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ]
+          }
+  - length: { docs: 1 }
+  - length: { docs.0.processor_results: 3 }
+  - match: { docs.0.processor_results.0.doc._source.field2: "_value" }
+  - match: { docs.0.processor_results.0.description: "processor_description" }
+  - match: { docs.0.processor_results.0.tag: "processor_tag" }
+  - match: { docs.0.processor_results.0.status: "success" }
+  - match: { docs.0.processor_results.0.processor_type: "set" }
+  - match: { docs.0.processor_results.1.status: "skipped" }
+  - match: { docs.0.processor_results.1.processor_type: "drop" }
+  - match: { docs.0.processor_results.1.if.condition: "false" }
+  - match: { docs.0.processor_results.1.if.result: false }
+  - match: { docs.0.processor_results.2.status: "dropped" }
+  - match: { docs.0.processor_results.2.processor_type: "drop" }
+  - match: { docs.0.processor_results.2.if.condition: "true" }
+  - match: { docs.0.processor_results.2.if.result: true }
+---
+"Test simulate with provided pipeline that does not exist":
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline": {
+              "description": "_description",
+              "processors": [
+                {
+                  "pipeline": {
+                     "name": "____pipeline_doesnot_exist___"
+                  }
+                }
+              ]
+            },
+            "docs": [
+              {
+                "_source": {}
+              }
+            ]
+          }
+  - match: { docs.0.processor_results.0.status: "error" }
+  - match: { docs.0.processor_results.0.error.root_cause.0.type: "illegal_argument_exception" }
+  - match: { docs.0.processor_results.0.error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" }

+ 86 - 14
server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java

@@ -8,22 +8,47 @@
 
 package org.elasticsearch.action.ingest;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportService;
 
+import java.util.Collection;
 import java.util.Map;
+import java.util.Random;
 
 public class SimulatePipelineTransportAction extends HandledTransportAction<SimulatePipelineRequest, SimulatePipelineResponse> {
-
+    private static final Logger logger = LogManager.getLogger(SimulatePipelineTransportAction.class);
+    /**
+     * This is the amount of time given as the timeout for transport requests to the ingest node.
+     */
+    public static final Setting<TimeValue> INGEST_NODE_TRANSPORT_ACTION_TIMEOUT = Setting.timeSetting(
+        "ingest_node.transport_action_timeout",
+        TimeValue.timeValueSeconds(20),
+        TimeValue.timeValueMillis(1),
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
     private final IngestService ingestService;
     private final SimulateExecutionService executionService;
+    private final TransportService transportService;
+    private volatile TimeValue ingestNodeTransportActionTimeout;
+    // ThreadLocal because our unit testing framework does not like sharing Randoms across threads
+    private final ThreadLocal<Random> random = ThreadLocal.withInitial(Randomness::get);
 
     @Inject
     public SimulatePipelineTransportAction(
@@ -35,30 +60,77 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
         super(SimulatePipelineAction.NAME, transportService, actionFilters, SimulatePipelineRequest::new);
         this.ingestService = ingestService;
         this.executionService = new SimulateExecutionService(threadPool);
+        this.transportService = transportService;
+        this.ingestNodeTransportActionTimeout = INGEST_NODE_TRANSPORT_ACTION_TIMEOUT.get(ingestService.getClusterService().getSettings());
+        ingestService.getClusterService()
+            .getClusterSettings()
+            .addSettingsUpdateConsumer(
+                INGEST_NODE_TRANSPORT_ACTION_TIMEOUT,
+                newTimeout -> this.ingestNodeTransportActionTimeout = newTimeout
+            );
     }
 
     @Override
     protected void doExecute(Task task, SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
         final Map<String, Object> source = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
-
-        final SimulatePipelineRequest.Parsed simulateRequest;
+        DiscoveryNodes discoveryNodes = ingestService.getClusterService().state().nodes();
+        Map<String, DiscoveryNode> ingestNodes = discoveryNodes.getIngestNodes();
+        if (ingestNodes.isEmpty()) {
+            /*
+             * Some resources used by pipelines, such as the geoip database, only exist on ingest nodes. Since we only run pipelines on
+             * nodes with the ingest role, we ought to only simulate a pipeline on nodes with the ingest role.
+             */
+            listener.onFailure(
+                new IllegalStateException("There are no ingest nodes in this cluster, unable to forward request to an ingest node.")
+            );
+            return;
+        }
         try {
-            if (request.getId() != null) {
-                simulateRequest = SimulatePipelineRequest.parseWithPipelineId(
-                    request.getId(),
-                    source,
-                    request.isVerbose(),
-                    ingestService,
-                    request.getRestApiVersion()
-                );
+            if (discoveryNodes.getLocalNode().isIngestNode()) {
+                final SimulatePipelineRequest.Parsed simulateRequest;
+                if (request.getId() != null) {
+                    simulateRequest = SimulatePipelineRequest.parseWithPipelineId(
+                        request.getId(),
+                        source,
+                        request.isVerbose(),
+                        ingestService,
+                        request.getRestApiVersion()
+                    );
+                } else {
+                    simulateRequest = SimulatePipelineRequest.parse(
+                        source,
+                        request.isVerbose(),
+                        ingestService,
+                        request.getRestApiVersion()
+                    );
+                }
+                executionService.execute(simulateRequest, listener);
             } else {
-                simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), ingestService, request.getRestApiVersion());
+                DiscoveryNode ingestNode = getRandomIngestNode(ingestNodes.values());
+                logger.trace("forwarding request [{}] to ingest node [{}]", actionName, ingestNode);
+                ActionListenerResponseHandler<SimulatePipelineResponse> handler = new ActionListenerResponseHandler<>(
+                    listener,
+                    SimulatePipelineAction.INSTANCE.getResponseReader()
+                );
+                if (task == null) {
+                    transportService.sendRequest(ingestNode, actionName, request, handler);
+                } else {
+                    transportService.sendChildRequest(
+                        ingestNode,
+                        actionName,
+                        request,
+                        task,
+                        TransportRequestOptions.timeout(ingestNodeTransportActionTimeout),
+                        handler
+                    );
+                }
             }
         } catch (Exception e) {
             listener.onFailure(e);
-            return;
         }
+    }
 
-        executionService.execute(simulateRequest, listener);
+    private DiscoveryNode getRandomIngestNode(Collection<DiscoveryNode> ingestNodes) {
+        return ingestNodes.toArray(new DiscoveryNode[0])[random.get().nextInt(ingestNodes.size())];
     }
 }

+ 11 - 2
server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.action.ingest;
 
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -159,7 +160,11 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
      * Read from a stream.
      */
     SimulateProcessorResult(StreamInput in) throws IOException {
-        this.processorTag = in.readString();
+        if (in.getVersion().onOrAfter(Version.V_8_7_0)) {
+            this.processorTag = in.readOptionalString();
+        } else {
+            this.processorTag = in.readString();
+        }
         this.ingestDocument = in.readOptionalWriteable(WriteableIngestDocument::new);
         this.failure = in.readException();
         this.description = in.readOptionalString();
@@ -174,7 +179,11 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        out.writeString(processorTag);
+        if (out.getVersion().onOrAfter(Version.V_8_7_0)) {
+            out.writeOptionalString(processorTag);
+        } else {
+            out.writeString(processorTag);
+        }
         out.writeOptionalWriteable(ingestDocument);
         out.writeException(failure);
         out.writeOptionalString(description);

+ 2 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager;
 import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
 import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
 import org.elasticsearch.action.bulk.WriteAckDelay;
+import org.elasticsearch.action.ingest.SimulatePipelineTransportAction;
 import org.elasticsearch.action.search.TransportSearchAction;
 import org.elasticsearch.action.support.AutoCreateIndex;
 import org.elasticsearch.action.support.DestructiveOperations;
@@ -537,6 +538,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
         HealthNodeTaskExecutor.ENABLED_SETTING,
         LocalHealthMonitor.POLL_INTERVAL_SETTING,
         TransportHealthNodeAction.HEALTH_NODE_TRANSPORT_ACTION_TIMEOUT,
+        SimulatePipelineTransportAction.INGEST_NODE_TRANSPORT_ACTION_TIMEOUT,
         WriteAckDelay.WRITE_ACK_DELAY_INTERVAL,
         WriteAckDelay.WRITE_ACK_DELAY_RANDOMNESS_BOUND,
         TcpTransport.isUntrustedRemoteClusterEnabled() ? RemoteClusterService.REMOTE_CLUSTER_AUTHORIZATION : null

+ 2 - 2
server/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java

@@ -61,8 +61,8 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
 
     static SimulateProcessorResult createTestInstance(boolean isSuccessful, boolean isIgnoredException, boolean hasCondition) {
         String type = randomAlphaOfLengthBetween(1, 10);
-        String processorTag = randomAlphaOfLengthBetween(1, 10);
-        String description = randomAlphaOfLengthBetween(1, 10);
+        String processorTag = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 10);
+        String description = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 10);
         Tuple<String, Boolean> conditionWithResult = hasCondition ? new Tuple<>(randomAlphaOfLengthBetween(1, 10), randomBoolean()) : null;
         SimulateProcessorResult simulateProcessorResult;
         if (isSuccessful) {