Browse Source

Add pipeline name to ingest metadata (#50467)

This commit adds the name of the current pipeline to ingest metadata.
This pipeline name is accessible under the following key: '_ingest.pipeline'.

Example usage in pipeline:
PUT /_ingest/pipeline/2
{
    "processors": [
        {
            "set": {
                "field": "pipeline_name",
                "value": "{{_ingest.pipeline}}"
            }
        }
    ]
}

Closes #42106
Martijn van Groningen 5 years ago
parent
commit
2b2935fd52

+ 8 - 4
docs/reference/ingest/apis/simulate-pipeline.asciidoc

@@ -350,7 +350,8 @@ The API returns the following response:
                      "foo": "bar"
                   },
                   "_ingest": {
-                     "timestamp": "2017-05-04T22:46:09.674Z"
+                     "timestamp": "2017-05-04T22:46:09.674Z",
+                     "pipeline": "_simulate_pipeline"
                   }
                }
             },
@@ -364,7 +365,8 @@ The API returns the following response:
                      "foo": "bar"
                   },
                   "_ingest": {
-                     "timestamp": "2017-05-04T22:46:09.675Z"
+                     "timestamp": "2017-05-04T22:46:09.675Z",
+                     "pipeline": "_simulate_pipeline"
                   }
                }
             }
@@ -381,7 +383,8 @@ The API returns the following response:
                      "foo": "rab"
                   },
                   "_ingest": {
-                     "timestamp": "2017-05-04T22:46:09.676Z"
+                     "timestamp": "2017-05-04T22:46:09.676Z",
+                     "pipeline": "_simulate_pipeline"
                   }
                }
             },
@@ -395,7 +398,8 @@ The API returns the following response:
                      "foo": "rab"
                   },
                   "_ingest": {
-                     "timestamp": "2017-05-04T22:46:09.677Z"
+                     "timestamp": "2017-05-04T22:46:09.677Z",
+                     "pipeline": "_simulate_pipeline"
                   }
                }
             }

+ 2 - 0
docs/reference/ingest/processors/pipeline.asciidoc

@@ -21,6 +21,8 @@ include::common-options.asciidoc[]
 --------------------------------------------------
 // NOTCONSOLE
 
+The name of the current pipeline can be accessed from the `_ingest.pipeline` ingest metadata key.
+
 An example of using this processor for nesting pipelines would be:
 
 Define an inner pipeline:

+ 78 - 0
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml

@@ -202,3 +202,81 @@ teardown:
           }
   - match: { error.root_cause.0.type: "illegal_state_exception" }
   - match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [legal-department]" }
+
+---
+"Test _ingest.pipeline metadata":
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline1"
+        body:  >
+          {
+            "processors" : [
+              {
+                "append" : {
+                  "field": "pipelines",
+                  "value": "{{_ingest.pipeline}}"
+                }
+              },
+              {
+                "pipeline" : {
+                  "name": "another_pipeline"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.put_pipeline:
+        id: "another_pipeline"
+        body:  >
+          {
+            "processors" : [
+              {
+                "append" : {
+                  "field": "pipelines",
+                  "value": "{{_ingest.pipeline}}"
+                }
+              },
+              {
+                "pipeline" : {
+                  "name": "another_pipeline2"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.put_pipeline:
+        id: "another_pipeline2"
+        body:  >
+          {
+            "processors" : [
+              {
+                "append" : {
+                  "field": "pipelines",
+                  "value": "{{_ingest.pipeline}}"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      index:
+        index: test
+        id: 1
+        pipeline: "pipeline1"
+        body: >
+          {
+          }
+
+  - do:
+      get:
+        index: test
+        id: 1
+  - length: { _source.pipelines: 3 }
+  - match: { _source.pipelines.0: "pipeline1" }
+  - match: { _source.pipelines.1: "another_pipeline" }
+  - match: { _source.pipelines.2: "another_pipeline2" }

+ 12 - 6
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml

@@ -284,26 +284,30 @@ teardown:
   - length: { docs.0.processor_results.0.doc._source: 2 }
   - match: { docs.0.processor_results.0.doc._source.foo.bar.0.item: "HELLO" }
   - match: { docs.0.processor_results.0.doc._source.field2.value: "_value" }
-  - length: { docs.0.processor_results.0.doc._ingest: 1 }
+  - length: { docs.0.processor_results.0.doc._ingest: 2 }
   - is_true: docs.0.processor_results.0.doc._ingest.timestamp
+  - is_true: docs.0.processor_results.0.doc._ingest.pipeline
   - length: { docs.0.processor_results.1.doc._source: 3 }
   - match: { docs.0.processor_results.1.doc._source.foo.bar.0.item: "HELLO" }
   - match: { docs.0.processor_results.1.doc._source.field2.value: "_value" }
   - match: { docs.0.processor_results.1.doc._source.field3: "third_val" }
-  - length: { docs.0.processor_results.1.doc._ingest: 1 }
+  - length: { docs.0.processor_results.1.doc._ingest: 2 }
   - is_true: docs.0.processor_results.1.doc._ingest.timestamp
+  - is_true: docs.0.processor_results.1.doc._ingest.pipeline
   - length: { docs.0.processor_results.2.doc._source: 3 }
   - match: { docs.0.processor_results.2.doc._source.foo.bar.0.item: "HELLO" }
   - match: { docs.0.processor_results.2.doc._source.field2.value: "_VALUE" }
   - match: { docs.0.processor_results.2.doc._source.field3: "third_val" }
-  - length: { docs.0.processor_results.2.doc._ingest: 1 }
+  - length: { docs.0.processor_results.2.doc._ingest: 2 }
   - is_true: docs.0.processor_results.2.doc._ingest.timestamp
+  - is_true: docs.0.processor_results.2.doc._ingest.pipeline
   - length: { docs.0.processor_results.3.doc._source: 3 }
   - match: { docs.0.processor_results.3.doc._source.foo.bar.0.item: "hello" }
   - match: { docs.0.processor_results.3.doc._source.field2.value: "_VALUE" }
   - match: { docs.0.processor_results.3.doc._source.field3: "third_val" }
-  - length: { docs.0.processor_results.3.doc._ingest: 1 }
+  - length: { docs.0.processor_results.3.doc._ingest: 2 }
   - is_true: docs.0.processor_results.3.doc._ingest.timestamp
+  - is_true: docs.0.processor_results.3.doc._ingest.pipeline
 
 ---
 "Test simulate with exception thrown":
@@ -393,12 +397,14 @@ teardown:
   - match: { docs.1.processor_results.0.doc._index: "index" }
   - match: { docs.1.processor_results.0.doc._source.foo: 5 }
   - match: { docs.1.processor_results.0.doc._source.bar: "hello" }
-  - length: { docs.1.processor_results.0.doc._ingest: 1 }
+  - length: { docs.1.processor_results.0.doc._ingest: 2 }
   - is_true: docs.1.processor_results.0.doc._ingest.timestamp
+  - is_true: docs.1.processor_results.0.doc._ingest.pipeline
   - match: { docs.1.processor_results.1.doc._source.foo: 5 }
   - match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
-  - length: { docs.1.processor_results.1.doc._ingest: 1 }
+  - length: { docs.1.processor_results.1.doc._ingest: 2 }
   - is_true: docs.1.processor_results.1.doc._ingest.timestamp
+  - is_true: docs.1.processor_results.1.doc._ingest.pipeline
 
 ---
 "Test verbose simulate with on_failure":

+ 6 - 0
server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

@@ -646,8 +646,14 @@ public final class IngestDocument {
      */
     public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
         if (executedPipelines.add(pipeline.getId())) {
+            Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
             pipeline.execute(this, (result, e) -> {
                 executedPipelines.remove(pipeline.getId());
+                if (previousPipeline != null) {
+                    ingestMetadata.put("pipeline", previousPipeline);
+                } else {
+                    ingestMetadata.remove("pipeline");
+                }
                 handler.accept(result, e);
             });
         } else {

+ 1 - 1
server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java

@@ -31,7 +31,7 @@ public class PipelineProcessor extends AbstractProcessor {
     private final TemplateScript.Factory pipelineTemplate;
     private final IngestService ingestService;
 
-    private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
+    PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
         super(tag);
         this.pipelineTemplate = pipelineTemplate;
         this.ingestService = ingestService;

+ 27 - 29
server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java

@@ -91,23 +91,17 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
         SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
         assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
-        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
-        IngestDocument firstProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument();
-        assertThat(firstProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
-        assertIngestDocument(firstProcessorIngestDocument, this.ingestDocument);
-        assertThat(firstProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));
 
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
+        assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
+
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("test-id"));
-        IngestDocument secondProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument();
-        assertThat(secondProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
-        assertIngestDocument(secondProcessorIngestDocument, this.ingestDocument);
-        assertThat(secondProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));
-        assertThat(secondProcessorIngestDocument.getSourceAndMetadata(),
-            not(sameInstance(firstProcessorIngestDocument.getSourceAndMetadata())));
+        assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(), ingestDocument);
+        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument().getSourceAndMetadata(),
+            not(sameInstance(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata())));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
     }
-
     public void testExecuteItem() throws Exception {
         TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {});
         Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
@@ -147,10 +141,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
-        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
-        assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
-        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
-                not(sameInstance(ingestDocument.getSourceAndMetadata())));
+        assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1"));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), instanceOf(RuntimeException.class));
@@ -191,14 +182,12 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock");
         metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0");
         metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed");
-        assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(),
-                ingestDocumentWithOnFailureMetadata);
-
+        assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(),
+            ingestDocumentWithOnFailureMetadata);
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
 
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getProcessorTag(), equalTo("processor_2"));
-        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), not(sameInstance(ingestDocument)));
-        assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), ingestDocument);
+        assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(2), pipeline.getId(), ingestDocument);
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getFailure(), nullValue());
     }
 
@@ -221,10 +210,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), sameInstance(exception));
-        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
-        assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
-        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
-                not(sameInstance(ingestDocument.getSourceAndMetadata())));
+        assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
     }
 
     public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
@@ -245,10 +231,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
         assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
-        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
-        assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
-        assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
-                not(sameInstance(ingestDocument.getSourceAndMetadata())));
+        assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
     }
 
     public void testExecuteItemWithFailure() throws Exception {
@@ -392,4 +375,19 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         }
     }
 
+    private static void assertVerboseResult(SimulateProcessorResult result,
+                                            String expectedPipelineId,
+                                            IngestDocument expectedIngestDocument) {
+        IngestDocument simulateVerboseIngestDocument = result.getIngestDocument();
+        // Remove and compare pipeline key. It is always in the verbose result,
+        // since that is a snapshot of how the ingest doc looks during pipeline execution, but not in the final ingestDocument.
+        // The key gets added and removed during pipeline execution.
+        String actualPipelineId = (String) simulateVerboseIngestDocument.getIngestMetadata().remove("pipeline");
+        assertThat(actualPipelineId, equalTo(expectedPipelineId));
+
+        assertThat(simulateVerboseIngestDocument, not(sameInstance(expectedIngestDocument)));
+        assertIngestDocument(simulateVerboseIngestDocument, expectedIngestDocument);
+        assertThat(simulateVerboseIngestDocument.getSourceAndMetadata(), not(sameInstance(expectedIngestDocument.getSourceAndMetadata())));
+    }
+
 }

+ 2 - 1
server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java

@@ -285,11 +285,12 @@ public class CompoundProcessorTests extends ESTestCase {
     public void testFailureProcessorIsInvokedOnFailure() {
         TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> {
             Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
-            assertThat(ingestMetadata.entrySet(), hasSize(4));
+            assertThat(ingestMetadata.entrySet(), hasSize(5));
             assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!"));
             assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor"));
             assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue());
             assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2"));
+            assertThat(ingestMetadata.get("pipeline"), equalTo("1"));
         });
 
         Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));

+ 57 - 0
server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java

@@ -20,17 +20,22 @@ package org.elasticsearch.ingest;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.script.TemplateScript;
 import org.elasticsearch.test.ESTestCase;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.LongSupplier;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -205,6 +210,58 @@ public class PipelineProcessorTests extends ESTestCase {
         assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L));
     }
 
+    public void testIngestPipelineMetadata() {
+        IngestService ingestService = createIngestService();
+
+        final int numPipelines = 16;
+        Pipeline firstPipeline = null;
+        for (int i = 0; i < numPipelines; i++) {
+            String pipelineId = Integer.toString(i);
+            List<Processor> processors = new ArrayList<>();
+            processors.add(new AbstractProcessor(null) {
+                @Override
+                public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
+                    ingestDocument.appendFieldValue("pipelines", ingestDocument.getIngestMetadata().get("pipeline"));
+                    return ingestDocument;
+                }
+
+                @Override
+                public String getType() {
+                    return null;
+                }
+
+            });
+            if (i < (numPipelines - 1)) {
+                TemplateScript.Factory pipelineName = new TestTemplateService.MockTemplateScript.Factory(Integer.toString(i + 1));
+                processors.add(new PipelineProcessor(null, pipelineName, ingestService));
+            }
+
+
+            Pipeline pipeline = new Pipeline(pipelineId, null, null, new CompoundProcessor(false, processors, List.of()));
+            when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
+            if (firstPipeline == null) {
+                firstPipeline = pipeline;
+            }
+        }
+
+        IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
+        IngestDocument[] docHolder = new IngestDocument[1];
+        Exception[] errorHolder = new Exception[1];
+        testIngestDocument.executePipeline(firstPipeline, (doc, e) -> {
+            docHolder[0] = doc;
+            errorHolder[0] = e;
+        });
+        assertThat(docHolder[0], notNullValue());
+        assertThat(errorHolder[0], nullValue());
+
+        IngestDocument ingestDocument = docHolder[0];
+        List<?> pipelines = ingestDocument.getFieldValue("pipelines", List.class);
+        assertThat(pipelines.size(), equalTo(numPipelines));
+        for (int i = 0; i < numPipelines; i++) {
+            assertThat(pipelines.get(i), equalTo(Integer.toString(i)));
+        }
+    }
+
     static IngestService createIngestService() {
         IngestService ingestService = mock(IngestService.class);
         ScriptService scriptService = mock(ScriptService.class);

+ 5 - 0
server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java

@@ -220,6 +220,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
         SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
+        expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
 
         verify(ingestService,  Mockito.atLeast(1)).getPipeline(pipelineId);
         assertThat(resultList.size(), equalTo(3));
@@ -287,6 +288,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
 
         SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
+        expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1);
 
         verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
         verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId2);
@@ -355,6 +357,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
 
         SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
+        expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1);
 
         verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
         verify(ingestService, Mockito.never()).getPipeline(pipelineId2);
@@ -406,6 +409,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
         SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
+        expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
 
         verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId);
         assertThat(resultList.size(), equalTo(4));
@@ -482,6 +486,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         trackingProcessor.execute(ingestDocument, (result, e) -> {});
 
         SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
+        expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
 
         verify(ingestService,  Mockito.atLeast(2)).getPipeline(pipelineId);
         assertThat(resultList.size(), equalTo(2));