Explorar o código

Simulate should succeed if ignore_missing_pipeline (#108106)

PipelineProcessors with non-existing pipelines should succeed (as noop)
 if ignore_missing_pipeline=true. Currently, does not work when pipelines are
 simulated with verbose=true. In this case, an error is returned and no results
 are shown for subsequent processors. This change allows following processors
 to run, and changes the status from error to error_ignored.
Parker Timmins hai 1 ano
pai
achega
796b0deeec

+ 6 - 0
docs/changelog/108106.yaml

@@ -0,0 +1,6 @@
+pr: 108106
+summary: Simulate should succeed if `ignore_missing_pipeline`
+area: Ingest Node
+type: bug
+issues:
+ - 107314

+ 47 - 0
modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml

@@ -981,3 +981,50 @@ teardown:
   - match: { docs.0.processor_results.0.status: "error" }
   - match: { docs.0.processor_results.0.error.root_cause.0.type: "illegal_argument_exception" }
   - match: { docs.0.processor_results.0.error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" }
+
+---
+"Test verbose simulate with Pipeline Processor and ignore_missing_pipeline":
+  - do:
+      ingest.simulate:
+        verbose: true
+        body: >
+          {
+            "pipeline": {
+              "description": "outer pipeline",
+              "processors": [
+              {
+                "pipeline": {
+                  "name": "missing-inner",
+                  "ignore_missing_pipeline": true
+                }
+              },
+              {
+                "set": {
+                  "field": "outer-field",
+                  "value": true
+                }
+              }
+              ]
+            },
+            "docs": [
+            {
+              "_index": "index",
+              "_id": "id",
+              "_source": {
+                "field1": "123.42 400 <foo>"
+              }
+            }
+            ]
+          }
+  - length: { docs: 1 }
+  - length: { docs.0.processor_results: 2 }
+  - match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
+  - match: { docs.0.processor_results.0.status: "error_ignored" }
+  - match: { docs.0.processor_results.0.processor_type: "pipeline" }
+  - match: { docs.0.processor_results.0.ignored_error.error.type: "illegal_argument_exception" }
+  - match: { docs.0.processor_results.0.ignored_error.error.reason: "Pipeline processor configured for non-existent pipeline [missing-inner]" }
+  - match: { docs.0.processor_results.1.doc._source.outer-field: true }
+  - match: { docs.0.processor_results.1.status: "success" }
+  - match: { docs.0.processor_results.1.processor_type: "set" }
+
+

+ 4 - 0
server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java

@@ -34,6 +34,10 @@ public class PipelineProcessor extends AbstractProcessor {
         this.ingestService = ingestService;
     }
 
+    public boolean isIgnoreMissingPipeline() {
+        return ignoreMissingPipeline;
+    }
+
     @Override
     public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
         String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate);

+ 8 - 1
server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

@@ -83,11 +83,18 @@ public final class TrackingResultProcessor implements Processor {
                         pipelineProcessor.getType(),
                         pipelineProcessor.getTag(),
                         pipelineProcessor.getDescription(),
+                        pipelineProcessor.isIgnoreMissingPipeline() ? ingestDocument : null,
                         e,
                         conditionalWithResult
                     )
                 );
-                throw e;
+
+                if (pipelineProcessor.isIgnoreMissingPipeline()) {
+                    handler.accept(ingestDocument, null);
+                    return;
+                } else {
+                    throw e;
+                }
             }
             if (performCycleCheck) {
                 ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> {