Browse Source

INGEST: Allow Repeated Invocation of Pipeline (#33419)

* Allows repeated, non-recursive invocation
of the same pipeline
Armin Braun 7 years ago
parent
commit
ef1066d7f8

+ 16 - 0
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java

@@ -113,4 +113,20 @@ public class PipelineProcessorTests extends ESTestCase {
             "Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage()
         );
     }
+
+    public void testAllowsRepeatedPipelineInvocations() throws Exception {
+        String innerPipelineId = "inner";
+        IngestService ingestService = mock(IngestService.class);
+        IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
+        Map<String, Object> outerConfig = new HashMap<>();
+        outerConfig.put("pipeline", innerPipelineId);
+        PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
+        Pipeline inner = new Pipeline(
+            innerPipelineId, null, null, new CompoundProcessor()
+        );
+        when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner);
+        Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig);
+        outerProc.execute(testIngestDocument);
+        outerProc.execute(testIngestDocument);
+    }
 }

+ 7 - 3
server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

@@ -645,10 +645,14 @@ public final class IngestDocument {
      * @throws Exception On exception in pipeline execution
      */
     public IngestDocument executePipeline(Pipeline pipeline) throws Exception {
-        if (this.executedPipelines.add(pipeline) == false) {
-            throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
+        try {
+            if (this.executedPipelines.add(pipeline) == false) {
+                throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
+            }
+            return pipeline.execute(this);
+        } finally {
+            executedPipelines.remove(pipeline);
         }
-        return pipeline.execute(this);
     }
 
     @Override