Browse Source

Add templating support to pipeline processor. (#49030)

This commit adds templating support to the pipeline processor's `name` option.

Closes #39955
Martijn van Groningen 5 years ago
parent
commit
88aea2107d

+ 1 - 1
docs/reference/ingest/processors/pipeline.asciidoc

@@ -7,7 +7,7 @@ Executes another pipeline.
 [options="header"]
 |======
 | Name             | Required  | Default  | Description
-| `name`           | yes       | -        | The name of the pipeline to execute
+| `name`           | yes       | -        | The name of the pipeline to execute. Supports <<accessing-template-fields,template snippets>>.
 include::common-options.asciidoc[]
 |======
 

+ 6 - 0
modules/ingest-common/build.gradle

@@ -28,3 +28,9 @@ dependencies {
   compile project(':libs:elasticsearch-grok')
   compile project(':libs:elasticsearch-dissect')
 }
+
+testClusters.integTest {
+  // Needed in order to test ingest pipeline templating:
+  // (this is because the integTest node is not using default distribution, but only the minimal number of required modules)
+  module file(project(':modules:lang-mustache').tasks.bundlePlugin.archiveFile)
+}

+ 94 - 0
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml

@@ -108,3 +108,97 @@ teardown:
       body: {}
 - match: { error.root_cause.0.type: "ingest_processor_exception" }
 - match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
+
+---
+"Test Pipeline Processor with templating":
+  - do:
+      ingest.put_pipeline:
+        id: "engineering-department"
+        body:  >
+          {
+            "processors" : [
+              {
+                "set" : {
+                  "field": "manager",
+                  "value": "john"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.put_pipeline:
+        id: "sales-department"
+        body:  >
+          {
+            "processors" : [
+              {
+                "set" : {
+                  "field": "manager",
+                  "value": "jan"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.put_pipeline:
+        id: "outer"
+        body:  >
+          {
+            "processors" : [
+              {
+                "pipeline" : {
+                  "name": "{{org}}-department"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      index:
+        index: test
+        id: 1
+        pipeline: "outer"
+        body: >
+          {
+            "org": "engineering"
+          }
+
+  - do:
+      get:
+        index: test
+        id: 1
+  - match: { _source.manager: "john" }
+
+  - do:
+      index:
+        index: test
+        id: 2
+        pipeline: "outer"
+        body: >
+          {
+            "org": "sales"
+          }
+
+  - do:
+      get:
+        index: test
+        id: 2
+  - match: { _source.manager: "jan" }
+
+  - do:
+      catch: /illegal_state_exception/
+      index:
+        index: test
+        id: 3
+        pipeline: "outer"
+        body: >
+          {
+            "org": "legal"
+          }
+  - match: { error.root_cause.0.type: "ingest_processor_exception" }
+  - match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Pipeline processor configured for non-existent pipeline [legal-department]" }

+ 2 - 2
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -460,7 +460,7 @@ public class IngestService implements ClusterStateApplier {
     }
 
     //package private for testing
-    static String getProcessorName(Processor processor){
+    static String getProcessorName(Processor processor) {
         // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
         if(processor instanceof ConditionalProcessor){
             processor = ((ConditionalProcessor) processor).getInnerProcessor();
@@ -469,7 +469,7 @@ public class IngestService implements ClusterStateApplier {
         sb.append(processor.getType());
 
         if(processor instanceof PipelineProcessor){
-            String pipelineName = ((PipelineProcessor) processor).getPipelineName();
+            String pipelineName = ((PipelineProcessor) processor).getPipelineTemplate().newInstance(Map.of()).execute();
             sb.append(":");
             sb.append(pipelineName);
         }

+ 13 - 10
server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.ingest;
 
+import org.elasticsearch.script.TemplateScript;
+
 import java.util.Map;
 import java.util.function.BiConsumer;
 
@@ -26,18 +28,18 @@ public class PipelineProcessor extends AbstractProcessor {
 
     public static final String TYPE = "pipeline";
 
-    private final String pipelineName;
-
+    private final TemplateScript.Factory pipelineTemplate;
     private final IngestService ingestService;
 
-    private PipelineProcessor(String tag, String pipelineName, IngestService ingestService) {
+    private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
         super(tag);
-        this.pipelineName = pipelineName;
+        this.pipelineTemplate = pipelineTemplate;
         this.ingestService = ingestService;
     }
 
     @Override
     public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
+        String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate);
         Pipeline pipeline = ingestService.getPipeline(pipelineName);
         if (pipeline != null) {
             ingestDocument.executePipeline(pipeline, handler);
@@ -52,7 +54,8 @@ public class PipelineProcessor extends AbstractProcessor {
         throw new UnsupportedOperationException("this method should not get executed");
     }
 
-    Pipeline getPipeline(){
+    Pipeline getPipeline(IngestDocument ingestDocument) {
+        String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate);
         return ingestService.getPipeline(pipelineName);
     }
 
@@ -61,8 +64,8 @@ public class PipelineProcessor extends AbstractProcessor {
         return TYPE;
     }
 
-    String getPipelineName() {
-        return pipelineName;
+    TemplateScript.Factory getPipelineTemplate() {
+        return pipelineTemplate;
     }
 
     public static final class Factory implements Processor.Factory {
@@ -76,9 +79,9 @@ public class PipelineProcessor extends AbstractProcessor {
         @Override
         public PipelineProcessor create(Map<String, Processor.Factory> registry, String processorTag,
             Map<String, Object> config) throws Exception {
-            String pipeline =
-                ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "name");
-            return new PipelineProcessor(processorTag, pipeline, ingestService);
+            TemplateScript.Factory pipelineTemplate =
+                ConfigurationUtils.readTemplateProperty(TYPE, processorTag, config, "name", ingestService.getScriptService());
+            return new PipelineProcessor(processorTag, pipelineTemplate, ingestService);
         }
     }
 }

+ 2 - 2
server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

@@ -45,10 +45,10 @@ public final class TrackingResultProcessor implements Processor {
     public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
         if (actualProcessor instanceof PipelineProcessor) {
             PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor);
-            Pipeline pipeline = pipelineProcessor.getPipeline();
+            Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument);
             //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
             IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
-            ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> {
+            ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(ingestDocument), (result, e) -> {
                 // do nothing, let the tracking processors throw the exception while recording the path up to the failure
                 if (e instanceof ElasticsearchException) {
                     ElasticsearchException elasticsearchException = (ElasticsearchException) e;

+ 1 - 1
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -1115,7 +1115,7 @@ public class IngestServiceTests extends ESTestCase {
 
         PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
         String pipelineName = randomAlphaOfLength(10);
-        when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName);
+        when(pipelineProcessor.getPipelineTemplate()).thenReturn(new TestTemplateService.MockTemplateScript.Factory(pipelineName));
         name = PipelineProcessor.TYPE;
         when(pipelineProcessor.getType()).thenReturn(name);
         assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName));

+ 13 - 5
server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java

@@ -19,6 +19,7 @@
 package org.elasticsearch.ingest;
 
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.test.ESTestCase;
 
 import java.util.Arrays;
@@ -37,7 +38,7 @@ public class PipelineProcessorTests extends ESTestCase {
 
     public void testExecutesPipeline() throws Exception {
         String pipelineId = "pipeline";
-        IngestService ingestService = mock(IngestService.class);
+        IngestService ingestService = createIngestService();
         CompletableFuture<IngestDocument> invoked = new CompletableFuture<>();
         IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
         Pipeline pipeline = new Pipeline(
@@ -69,7 +70,7 @@ public class PipelineProcessorTests extends ESTestCase {
     }
 
     public void testThrowsOnMissingPipeline() throws Exception {
-        IngestService ingestService = mock(IngestService.class);
+        IngestService ingestService = createIngestService();
         IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
         PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
         Map<String, Object> config = new HashMap<>();
@@ -85,7 +86,7 @@ public class PipelineProcessorTests extends ESTestCase {
     public void testThrowsOnRecursivePipelineInvocations() throws Exception {
         String innerPipelineId = "inner";
         String outerPipelineId = "outer";
-        IngestService ingestService = mock(IngestService.class);
+        IngestService ingestService = createIngestService();
         IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
         Map<String, Object> outerConfig = new HashMap<>();
         outerConfig.put("name", innerPipelineId);
@@ -113,7 +114,7 @@ public class PipelineProcessorTests extends ESTestCase {
 
     public void testAllowsRepeatedPipelineInvocations() throws Exception {
         String innerPipelineId = "inner";
-        IngestService ingestService = mock(IngestService.class);
+        IngestService ingestService = createIngestService();
         IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
         Map<String, Object> outerConfig = new HashMap<>();
         outerConfig.put("name", innerPipelineId);
@@ -131,7 +132,7 @@ public class PipelineProcessorTests extends ESTestCase {
         String pipeline1Id = "pipeline1";
         String pipeline2Id = "pipeline2";
         String pipeline3Id = "pipeline3";
-        IngestService ingestService = mock(IngestService.class);
+        IngestService ingestService = createIngestService();
         PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
 
         Map<String, Object> pipeline1ProcessorConfig = new HashMap<>();
@@ -203,4 +204,11 @@ public class PipelineProcessorTests extends ESTestCase {
         assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L));
         assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L));
     }
+
+    static IngestService createIngestService() {
+        IngestService ingestService = mock(IngestService.class);
+        ScriptService scriptService = mock(ScriptService.class);
+        when(ingestService.getScriptService()).thenReturn(scriptService);
+        return ingestService;
+    }
 }

+ 7 - 7
server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java

@@ -40,6 +40,7 @@ import java.util.Map;
 import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD;
 import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD;
 import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD;
+import static org.elasticsearch.ingest.PipelineProcessorTests.createIngestService;
 import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
@@ -47,7 +48,6 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -195,7 +195,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
     public void testActualPipelineProcessor() throws Exception {
         String pipelineId = "pipeline1";
-        IngestService ingestService = mock(IngestService.class);
+        IngestService ingestService = createIngestService();
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put("name", pipelineId);
         PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
@@ -240,7 +240,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
     public void testActualPipelineProcessorWithTrueConditional() throws Exception {
         String pipelineId1 = "pipeline1";
         String pipelineId2 = "pipeline2";
-        IngestService ingestService = mock(IngestService.class);
+        IngestService ingestService = createIngestService();
         Map<String, Object> pipelineConfig0 = new HashMap<>();
         pipelineConfig0.put("name", pipelineId1);
         Map<String, Object> pipelineConfig1 = new HashMap<>();
@@ -308,7 +308,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
     public void testActualPipelineProcessorWithFalseConditional() throws Exception {
         String pipelineId1 = "pipeline1";
         String pipelineId2 = "pipeline2";
-        IngestService ingestService = mock(IngestService.class);
+        IngestService ingestService = createIngestService();
         Map<String, Object> pipelineConfig0 = new HashMap<>();
         pipelineConfig0.put("name", pipelineId1);
         Map<String, Object> pipelineConfig1 = new HashMap<>();
@@ -377,7 +377,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         RuntimeException exception = new RuntimeException("processor failed");
 
         String pipelineId = "pipeline1";
-        IngestService ingestService = mock(IngestService.class);
+        IngestService ingestService = createIngestService();
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put("name", pipelineId);
         PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
@@ -430,7 +430,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
     public void testActualPipelineProcessorWithCycle() throws Exception {
         String pipelineId1 = "pipeline1";
         String pipelineId2 = "pipeline2";
-        IngestService ingestService = mock(IngestService.class);
+        IngestService ingestService = createIngestService();
         Map<String, Object> pipelineConfig0 = new HashMap<>();
         pipelineConfig0.put("name", pipelineId1);
         Map<String, Object> pipelineConfig1 = new HashMap<>();
@@ -462,7 +462,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
     public void testActualPipelineProcessorRepeatedInvocation() throws Exception {
         String pipelineId = "pipeline1";
-        IngestService ingestService = mock(IngestService.class);
+        IngestService ingestService = createIngestService();
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put("name", pipelineId);
         PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);