Browse Source

Add a `terminate` ingest processor (#114157) (#114343)

This processor simply causes any remaining processors in the pipeline
to be skipped. It will normally be executed conditionally using the
`if` option. (If this pipeline is being called from another pipeline,
the calling pipeline is *not* terminated.)

For example, this:

```
POST /_ingest/pipeline/_simulate
{
  "pipeline":
  {
    "description": "Appends just 'before' to the steps field if the number field
 is present, or both 'before' and 'after' if not",
    "processors": [
      {
        "append": {
          "field": "steps",
          "value": "before"
        }
      },
      {
        "terminate": {
          "if": "ctx.error != null"
        }
      },
      {
        "append": {
          "field": "steps",
          "value": "after"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "doc1",
      "_source": {
        "name": "okay",
        "steps": []
      }
    },
    {
      "_index": "index",
      "_id": "doc2",
      "_source": {
        "name": "bad",
        "error": "oh no",
        "steps": []
      }
    }
  ]
}
```

returns something like this:

```
{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_version": "-3",
        "_id": "doc1",
        "_source": {
          "name": "okay",
          "steps": [
            "before",
            "after"
          ]
        },
        "_ingest": {
          "timestamp": "2024-10-04T16:25:20.448881Z"
        }
      }
    },
    {
      "doc": {
        "_index": "index",
        "_version": "-3",
        "_id": "doc2",
        "_source": {
          "name": "bad",
          "error": "oh no",
          "steps": [
            "before"
          ]
        },
        "_ingest": {
          "timestamp": "2024-10-04T16:25:20.448932Z"
        }
      }
    }
  ]
}
```
Pete Gillin 1 year ago
parent
commit
6ec7a3439d

+ 6 - 0
docs/changelog/114157.yaml

@@ -0,0 +1,6 @@
+pr: 114157
+summary: Add a `terminate` ingest processor
+area: Ingest Node
+type: feature
+issues:
+ - 110218

+ 30 - 0
docs/reference/ingest/processors/terminate.asciidoc

@@ -0,0 +1,30 @@
+[[terminate-processor]]
+=== Terminate processor
+
+++++
+<titleabbrev>Terminate</titleabbrev>
+++++
+
+Terminates the current ingest pipeline, causing no further processors to be run.
+This will normally be executed conditionally, using the `if` option.
+
+If this pipeline is being called from another pipeline, the calling pipeline is *not* terminated.
+
+[[terminate-options]]
+.Terminate Options
+[options="header"]
+|======
+| Name        | Required | Default | Description
+include::common-options.asciidoc[]
+|======
+
+[source,js]
+--------------------------------------------------
+{
+  "description" : "terminates the current pipeline if the error field is present",
+  "terminate": {
+    "if": "ctx.error != null"
+  }
+}
+--------------------------------------------------
+// NOTCONSOLE

+ 1 - 0
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

@@ -72,6 +72,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
             entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)),
             entry(SortProcessor.TYPE, new SortProcessor.Factory()),
             entry(SplitProcessor.TYPE, new SplitProcessor.Factory()),
+            entry(TerminateProcessor.TYPE, new TerminateProcessor.Factory()),
             entry(TrimProcessor.TYPE, new TrimProcessor.Factory()),
             entry(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()),
             entry(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory()),

+ 53 - 0
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/TerminateProcessor.java

@@ -0,0 +1,53 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.ingest.AbstractProcessor;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
+
+import java.util.Map;
+
+/**
+ * A {@link Processor} which simply prevents subsequent processors in the pipeline from running (without failing, like {@link FailProcessor}
+ * does). This will normally be run conditionally, using the {@code if} option.
+ */
+public class TerminateProcessor extends AbstractProcessor {
+
+    static final String TYPE = "terminate";
+
+    TerminateProcessor(String tag, String description) {
+        super(tag, description);
+    }
+
+    @Override
+    public IngestDocument execute(IngestDocument ingestDocument) {
+        ingestDocument.terminate();
+        return ingestDocument;
+    }
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+    public static final class Factory implements Processor.Factory {
+
+        @Override
+        public Processor create(
+            Map<String, Processor.Factory> processorFactories,
+            String tag,
+            String description,
+            Map<String, Object> config
+        ) {
+            return new TerminateProcessor(tag, description);
+        }
+    }
+}

+ 70 - 0
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/TerminateProcessorTests.java

@@ -0,0 +1,70 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.ingest.CompoundProcessor;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Pipeline;
+import org.elasticsearch.ingest.TestTemplateService;
+import org.elasticsearch.ingest.ValueSource;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Map;
+
+import static org.elasticsearch.ingest.RandomDocumentPicks.randomIngestDocument;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+public class TerminateProcessorTests extends ESTestCase {
+
+    public void testTerminateInPipeline() throws Exception {
+        Pipeline pipeline = new Pipeline(
+            "my-pipeline",
+            null,
+            null,
+            null,
+            new CompoundProcessor(
+                new SetProcessor(
+                    "before-set",
+                    "Sets before field to true",
+                    new TestTemplateService.MockTemplateScript.Factory("before"),
+                    ValueSource.wrap(true, TestTemplateService.instance()),
+                    null
+                ),
+                new TerminateProcessor("terminate", "terminates the pipeline"),
+                new SetProcessor(
+                    "after-set",
+                    "Sets after field to true",
+                    new TestTemplateService.MockTemplateScript.Factory("after"),
+                    ValueSource.wrap(true, TestTemplateService.instance()),
+                    null
+                )
+            )
+        );
+        IngestDocument input = randomIngestDocument(random(), Map.of("foo", "bar"));
+        PipelineOutput output = new PipelineOutput();
+
+        pipeline.execute(input, output::set);
+
+        assertThat(output.exception, nullValue());
+        // We expect the before-set processor to have run, but not the after-set one:
+        assertThat(output.document.getSource(), is(Map.of("foo", "bar", "before", true)));
+    }
+
+    private static class PipelineOutput {
+        IngestDocument document;
+        Exception exception;
+
+        void set(IngestDocument document, Exception exception) {
+            this.document = document;
+            this.exception = exception;
+        }
+    }
+}

+ 138 - 0
modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/330_terminate_processor.yml

@@ -0,0 +1,138 @@
+---
+setup:
+  - do:
+      ingest.put_pipeline:
+        id: "test-pipeline"
+        body: >
+          {
+            "description": "Appends just 'before' to the steps field if the number field is less than 50, or both 'before' and 'after' if not",
+            "processors": [
+              {
+                "append": {
+                  "field": "steps",
+                  "value": "before"
+                }
+              },
+              {
+                "terminate": {
+                  "if": "ctx.number < 50"
+                }
+              },
+              {
+                "append": {
+                  "field": "steps",
+                  "value": "after"
+                }
+              }
+            ]
+          }
+  - do:
+      ingest.put_pipeline:
+        id: "test-final-pipeline"
+        body: >
+          {
+            "description": "Appends 'final' to the steps field",
+            "processors": [
+              {
+                "append": {
+                  "field": "steps",
+                  "value": "final"
+                }
+              }
+            ]
+          }
+  - do:
+      ingest.put_pipeline:
+        id: "test-outer-pipeline"
+        body: >
+          {
+            "description": "Runs test-pipeline and then append 'outer' to the steps field",
+            "processors": [
+              {
+                "pipeline": {
+                  "name": "test-pipeline"
+                }
+              },
+              {
+                "append": {
+                  "field": "steps",
+                  "value": "outer"
+                }
+              }
+            ]
+          }
+  - do:
+      indices.create:
+        index: "test-index-with-default-and-final-pipelines"
+        body:
+          settings:
+            index:
+              default_pipeline: "test-pipeline"
+              final_pipeline: "test-final-pipeline"
+  - do:
+      indices.create:
+        index: "test-vanilla-index"
+
+---
+teardown:
+  - do:
+      indices.delete:
+        index: "test-index-with-default-and-final-pipelines"
+        ignore_unavailable: true
+  - do:
+      indices.delete:
+        index: "test-vanilla-index"
+        ignore_unavailable: true
+  - do:
+      ingest.delete_pipeline:
+        id: "test-pipeline"
+        ignore: 404
+  - do:
+      ingest.delete_pipeline:
+        id: "test-outer-pipeline"
+        ignore: 404
+
+---
+"Test pipeline including conditional terminate pipeline":
+
+  - do:
+      bulk:
+        refresh: true
+        body:
+          - '{ "index": {"_index": "test-index-with-default-and-final-pipelines" } }'
+          - '{ "comment": "should terminate", "number": 40, "steps": [] }'
+          - '{ "index": {"_index": "test-index-with-default-and-final-pipelines" } }'
+          - '{ "comment": "should continue to end", "number": 60, "steps": [] }'
+
+  - do:
+      search:
+        rest_total_hits_as_int: true
+        index: "test-index-with-default-and-final-pipelines"
+        body:
+          sort: "number"
+  - match: { hits.total: 2 }
+  - match: { hits.hits.0._source.number: 40 }
+  - match: { hits.hits.1._source.number: 60 }
+  - match: { hits.hits.0._source.steps: ["before", "final"] }
+  - match: { hits.hits.1._source.steps: ["before", "after", "final"] }
+
+---
+"Test pipeline with terminate invoked from an outer pipeline":
+
+  - do:
+      bulk:
+        refresh: true
+        pipeline: "test-outer-pipeline"
+        body:
+          - '{ "index": {"_index": "test-vanilla-index" } }'
+          - '{ "comment": "should terminate inner pipeline but not outer", "number": 40, "steps": [] }'
+
+  - do:
+      search:
+        rest_total_hits_as_int: true
+        index: "test-vanilla-index"
+        body:
+          sort: "number"
+  - match: { hits.total: 1 }
+  - match: { hits.hits.0._source.number: 40 }
+  - match: { hits.hits.0._source.steps: ["before", "outer"] }

+ 4 - 3
server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

@@ -148,7 +148,7 @@ public class CompoundProcessor implements Processor {
 
     void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiConsumer<IngestDocument, Exception> handler) {
         assert currentProcessor <= processorsWithMetrics.size();
-        if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
+        if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute() || ingestDocument.isTerminate()) {
             handler.accept(ingestDocument, null);
             return;
         }
@@ -159,7 +159,8 @@ public class CompoundProcessor implements Processor {
         // iteratively execute any sync processors
         while (currentProcessor < processorsWithMetrics.size()
             && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false
-            && ingestDocument.isReroute() == false) {
+            && ingestDocument.isReroute() == false
+            && ingestDocument.isTerminate() == false) {
             processorWithMetric = processorsWithMetrics.get(currentProcessor);
             processor = processorWithMetric.v1();
             metric = processorWithMetric.v2();
@@ -185,7 +186,7 @@ public class CompoundProcessor implements Processor {
         }
 
         assert currentProcessor <= processorsWithMetrics.size();
-        if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
+        if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute() || ingestDocument.isTerminate()) {
             handler.accept(ingestDocument, null);
             return;
         }

+ 22 - 0
server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

@@ -82,6 +82,7 @@ public final class IngestDocument {
 
     private boolean doNoSelfReferencesCheck = false;
     private boolean reroute = false;
+    private boolean terminate = false;
 
     public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map<String, Object> source) {
         this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source);
@@ -935,6 +936,27 @@ public final class IngestDocument {
         reroute = false;
     }
 
+    /**
+     * Sets the terminate flag to true, to indicate that no further processors in the current pipeline should be run for this document.
+     */
+    public void terminate() {
+        terminate = true;
+    }
+
+    /**
+     * Returns whether the {@link #terminate()} flag was set.
+     */
+    boolean isTerminate() {
+        return terminate;
+    }
+
+    /**
+     * Resets the {@link #terminate()} flag.
+     */
+    void resetTerminate() {
+        terminate = false;
+    }
+
     public enum Metadata {
         INDEX(IndexFieldMapper.NAME),
         TYPE("_type"),

+ 3 - 0
server/src/main/java/org/elasticsearch/ingest/Pipeline.java

@@ -133,6 +133,9 @@ public final class Pipeline {
             if (e != null) {
                 metrics.ingestFailed();
             }
+            // Reset the terminate status now that pipeline execution is complete (if this was executed as part of another pipeline, the
+            // outer pipeline should continue):
+            ingestDocument.resetTerminate();
             handler.accept(result, e);
         });
     }