Переглянути джерело

Add support for _meta field to ingest pipelines (#75905)

We are adding a _meta field to many of our REST APIs so that users can attach whatever metadata they
want. The data in this field will not be used by Elasticsearch. This commit add the _meta field to ingest
pipelines.
Keith Massey 4 роки тому
батько
коміт
498684a696

+ 46 - 0
docs/reference/ingest/apis/put-pipeline.asciidoc

@@ -76,7 +76,53 @@ Processors run sequentially in the order specified.
 `version`::
 (Optional, integer)
 Version number used by external systems to track ingest pipelines.
+
 +
 This parameter is intended for external systems only. {es} does not use or
 validate pipeline version numbers.
+
+`_meta`::
+(Optional, object)
+Optional metadata about the ingest pipeline. May have any contents. This
+map is not automatically generated by {es}.
 // end::pipeline-object[]
+
+[[put-pipeline-api-example]]
+==== {api-examples-title}
+
+[[pipeline-metadata]]
+===== Pipeline metadata
+
+You can use the `_meta` parameter to add arbitrary metadata to a pipeline.
+This user-defined object is stored in the cluster state,
+so keeping it short is preferable.
+
+The `_meta` parameter is optional and not automatically generated or used by {es}.
+
+To unset `_meta`, replace the pipeline without specifying one.
+
+[source,console]
+--------------------------------------------------
+PUT /_ingest/pipeline/my-pipeline-id
+{
+  "description" : "My optional pipeline description",
+  "processors" : [
+    {
+      "set" : {
+        "description" : "My optional processor description",
+        "field": "my-keyword-field",
+        "value": "foo"
+      }
+    }
+  ],
+  "_meta": {
+    "reason": "set my-keyword-field to foo",
+    "serialization": {
+      "class": "MyPipeline",
+      "id": 10
+    }
+  }
+}
+--------------------------------------------------
+
+To check the `_meta`, use the <<get-pipeline-api,get pipeline>> API.

+ 9 - 0
server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.action.ingest;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
@@ -22,8 +23,10 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.ingest.IngestInfo;
 import org.elasticsearch.ingest.IngestService;
+import org.elasticsearch.ingest.Pipeline;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -55,6 +58,12 @@ public class PutPipelineTransportAction extends AcknowledgedTransportMasterNodeA
     @Override
     protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
             throws Exception {
+        if (state.getNodes().getMinNodeVersion().before(Version.V_8_0_0)) {
+            Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
+            if (pipelineConfig.containsKey(Pipeline.META_KEY)) {
+                throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_8_0_0);
+            }
+        }
         NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
         nodesInfoRequest.clear()
             .addMetric(NodesInfoRequest.Metric.INGEST.metricName());

+ 1 - 1
server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

@@ -38,7 +38,7 @@ class SimulateExecutionService {
             List<SimulateProcessorResult> processorResultList = new CopyOnWriteArrayList<>();
             CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
             Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
-                verbosePipelineProcessor);
+                pipeline.getMetadata(), verbosePipelineProcessor);
             ingestDocument.executePipeline(verbosePipeline, (result, e) -> {
                 handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
             });

+ 1 - 1
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -849,7 +849,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
             }
         };
         String description = "this is a place holder pipeline, because pipeline with id [" +  id + "] could not be loaded";
-        return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor));
+        return new Pipeline(id, description, null, null, new CompoundProcessor(failureProcessor));
     }
 
     static class PipelineHolder {

+ 16 - 5
server/src/main/java/org/elasticsearch/ingest/Pipeline.java

@@ -29,25 +29,30 @@ public final class Pipeline {
     public static final String PROCESSORS_KEY = "processors";
     public static final String VERSION_KEY = "version";
     public static final String ON_FAILURE_KEY = "on_failure";
+    public static final String META_KEY = "_meta";
 
     private final String id;
     @Nullable
     private final String description;
     @Nullable
     private final Integer version;
+    @Nullable
+    private final Map<String, Object> metadata;
     private final CompoundProcessor compoundProcessor;
     private final IngestMetric metrics;
     private final LongSupplier relativeTimeProvider;
 
-    public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
-        this(id, description, version, compoundProcessor, System::nanoTime);
+    public Pipeline(String id, @Nullable String description, @Nullable Integer version,
+                    @Nullable Map<String, Object> metadata, CompoundProcessor compoundProcessor) {
+        this(id, description, version, metadata, compoundProcessor, System::nanoTime);
     }
 
     //package private for testing
-    Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor,
-             LongSupplier relativeTimeProvider) {
+    Pipeline(String id, @Nullable String description, @Nullable Integer version, @Nullable Map<String, Object> metadata,
+             CompoundProcessor compoundProcessor, LongSupplier relativeTimeProvider) {
         this.id = id;
         this.description = description;
+        this.metadata = metadata;
         this.compoundProcessor = compoundProcessor;
         this.version = version;
         this.metrics = new IngestMetric();
@@ -58,6 +63,7 @@ public final class Pipeline {
         Map<String, Processor.Factory> processorFactories, ScriptService scriptService) throws Exception {
         String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
         Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
+        Map<String, Object> metadata = ConfigurationUtils.readOptionalMap(null, null, config, META_KEY);
         List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
         List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, scriptService, processorFactories);
         List<Map<String, Object>> onFailureProcessorConfigs =
@@ -73,7 +79,7 @@ public final class Pipeline {
         }
         CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors),
                 Collections.unmodifiableList(onFailureProcessors));
-        return new Pipeline(id, description, version, compoundProcessor);
+        return new Pipeline(id, description, version, metadata, compoundProcessor);
     }
 
     /**
@@ -120,6 +126,11 @@ public final class Pipeline {
         return version;
     }
 
+    @Nullable
+    public Map<String, Object> getMetadata() {
+        return metadata;
+    }
+
     /**
      * Get the underlying {@link CompoundProcessor} containing the Pipeline's processors
      */

+ 1 - 1
server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java

@@ -10,13 +10,13 @@ package org.elasticsearch.ingest;
 
 import org.elasticsearch.cluster.AbstractDiffable;
 import org.elasticsearch.cluster.Diff;
-import org.elasticsearch.common.xcontent.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.ContextParser;
 import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ParseField;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentHelper;

+ 1 - 1
server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

@@ -83,7 +83,7 @@ public final class TrackingResultProcessor implements Processor {
                     processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
                         actualProcessor.getDescription(), conditionalWithResult));
                     Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
-                        verbosePipelineProcessor);
+                        pipeline.getMetadata(), verbosePipelineProcessor);
                     ingestDocument.executePipeline(verbosePipeline, handler);
                 }
             });

+ 12 - 12
server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java

@@ -66,7 +66,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
 
     public void testExecuteVerboseItem() throws Exception {
         TestProcessor processor = new TestProcessor("test-id", "mock", null, ingestDocument -> {});
-        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor, processor));
 
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
@@ -93,7 +93,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
     }
     public void testExecuteItem() throws Exception {
         TestProcessor processor = new TestProcessor("processor_0", "mock", null, ingestDocument -> {});
-        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor, processor));
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
         executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
@@ -113,7 +113,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         TestProcessor processor1 = new TestProcessor("processor_0", "mock", null, ingestDocument -> {});
         TestProcessor processor2 = new TestProcessor("processor_1", "mock", null, new RuntimeException("processor failed"));
         TestProcessor processor3 = new TestProcessor("processor_2", "mock", null, ingestDocument -> {});
-        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2, processor3));
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
         executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
@@ -142,8 +142,8 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         TestProcessor processor1 = new TestProcessor("processor_0", "mock", null, new RuntimeException("processor failed"));
         TestProcessor processor2 = new TestProcessor("processor_1", "mock", null, ingestDocument -> {});
         TestProcessor processor3 = new TestProcessor("processor_2", "mock", null, ingestDocument -> {});
-        Pipeline pipeline = new Pipeline("_id", "_description", version,
-                new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1),
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null,
+            new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1),
                                 Collections.singletonList(processor2)), processor3));
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
@@ -184,7 +184,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         RuntimeException exception = new RuntimeException("processor failed");
         TestProcessor testProcessor = new TestProcessor("processor_0", "mock", null, exception);
         CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
-        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor));
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
         executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
@@ -205,7 +205,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
     public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
         TestProcessor testProcessor = new TestProcessor("processor_0", "mock", null, ingestDocument -> { });
         CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
-        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor));
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
         executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
@@ -225,7 +225,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
 
     public void testExecuteItemWithFailure() throws Exception {
         TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); });
-        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor, processor));
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
         executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
@@ -247,7 +247,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
     public void testDropDocument() throws Exception {
         TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
         Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, null, Map.of());
-        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2));
 
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
@@ -267,7 +267,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
     public void testDropDocumentVerbose() throws Exception {
         TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
         Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, null, Map.of());
-        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2));
 
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
@@ -291,7 +291,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field1", "value"));
         Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, null, Map.of());
         TestProcessor processor3 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value"));
-        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2, processor3));
 
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
@@ -338,7 +338,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
                 return "none-of-your-business";
             }
         };
-        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1));
         SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false);
 
         AtomicReference<SimulatePipelineResponse> responseHolder = new AtomicReference<>();

+ 1 - 1
server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java

@@ -54,7 +54,7 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
         TestProcessor processor = new TestProcessor(ingestDocument -> {
         });
         CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
-        Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor);
+        Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, null, pipelineCompoundProcessor);
         Map<String, Processor.Factory> registry =
             Collections.singletonMap("mock_processor", (factories, tag, description, config) -> processor);
         ingestService = mock(IngestService.class);

+ 5 - 4
server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java

@@ -287,8 +287,9 @@ public class CompoundProcessorTests extends ESTestCase {
             assertThat(ingestMetadata.get("pipeline"), equalTo("1"));
         });
 
-        Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
-        Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(false,List.of(new AbstractProcessor(null, null) {
+        Pipeline pipeline2 = new Pipeline("2", null, null, null,
+            new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
+        Pipeline pipeline1 = new Pipeline("1", null, null, null, new CompoundProcessor(false,List.of(new AbstractProcessor(null, null) {
             @Override
             public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
                 ingestDocument.executePipeline(pipeline2, handler);
@@ -326,9 +327,9 @@ public class CompoundProcessorTests extends ESTestCase {
     }
 
     public void testNewCompoundProcessorExceptionPipelineOrigin() {
-        Pipeline pipeline2 = new Pipeline("2", null, null,
+        Pipeline pipeline2 = new Pipeline("2", null, null, null,
             new CompoundProcessor(new TestProcessor("my_tag", "my_type", null, new RuntimeException())));
-        Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(new AbstractProcessor(null, null) {
+        Pipeline pipeline1 = new Pipeline("1", null, null, null, new CompoundProcessor(new AbstractProcessor(null, null) {
             @Override
             public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
                  throw new UnsupportedOperationException();

+ 13 - 0
server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java

@@ -41,6 +41,19 @@ public class PipelineConfigurationTests extends AbstractXContentTestCase<Pipelin
         assertEquals("{}", serialized.getConfig().utf8ToString());
     }
 
+    public void testMetaSerialization() throws IOException {
+        String configJson = "{\"description\": \"blah\", \"_meta\" : {\"foo\": \"bar\"}}";
+        PipelineConfiguration configuration = new PipelineConfiguration("1",
+            new BytesArray(configJson.getBytes(StandardCharsets.UTF_8)), XContentType.JSON);
+        assertEquals(XContentType.JSON, configuration.getXContentType());
+        BytesStreamOutput out = new BytesStreamOutput();
+        configuration.writeTo(out);
+        StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes);
+        PipelineConfiguration serialized = PipelineConfiguration.readFrom(in);
+        assertEquals(XContentType.JSON, serialized.getXContentType());
+        assertEquals(configJson, serialized.getConfig().utf8ToString());
+    }
+
     public void testParser() throws IOException {
         ContextParser<Void, PipelineConfiguration> parser = PipelineConfiguration.getParser();
         XContentType xContentType = randomFrom(XContentType.values());

+ 53 - 1
server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.ingest;
 
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.test.ESTestCase;
 
@@ -29,6 +30,7 @@ public class PipelineFactoryTests extends ESTestCase {
     private final Integer version = randomBoolean() ? randomInt() : null;
     private final String versionString = version != null ? Integer.toString(version) : null;
     private final ScriptService scriptService = mock(ScriptService.class);
+    private final Map<String, Object> metadata = randomMapOfMaps();
 
     public void testCreate() throws Exception {
         Map<String, Object> processorConfig0 = new HashMap<>();
@@ -37,6 +39,9 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
         pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
+        if (metadata != null) {
+            pipelineConfig.put(Pipeline.META_KEY, metadata);
+        }
         pipelineConfig.put(Pipeline.PROCESSORS_KEY,
                 Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1)));
         Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
@@ -55,6 +60,9 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
         pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
+        if (metadata != null) {
+            pipelineConfig.put(Pipeline.META_KEY, metadata);
+        }
         try {
             Pipeline.create("_id", pipelineConfig, Collections.emptyMap(), scriptService);
             fail("should fail, missing required [processors] field");
@@ -67,6 +75,9 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
         pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
+        if (metadata != null) {
+            pipelineConfig.put(Pipeline.META_KEY, metadata);
+        }
         pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.emptyList());
         Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null, scriptService);
         assertThat(pipeline.getId(), equalTo("_id"));
@@ -80,6 +91,9 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
         pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
+        if (metadata != null) {
+            pipelineConfig.put(Pipeline.META_KEY, metadata);
+        }
         pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
         pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
         Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
@@ -98,6 +112,9 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
         pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
+        if (metadata != null) {
+            pipelineConfig.put(Pipeline.META_KEY, metadata);
+        }
         pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
         pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList());
         Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
@@ -114,6 +131,9 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
         pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
+        if (metadata != null) {
+            pipelineConfig.put(Pipeline.META_KEY, metadata);
+        }
         pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
         Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
         Exception e = expectThrows(
@@ -131,6 +151,9 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
         pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
+        if (metadata != null) {
+            pipelineConfig.put(Pipeline.META_KEY, metadata);
+        }
         pipelineConfig.put(Pipeline.PROCESSORS_KEY,
                 Collections.singletonList(Collections.singletonMap("test", processorConfig)));
 
@@ -152,6 +175,9 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
         pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
+        if (metadata != null) {
+            pipelineConfig.put(Pipeline.META_KEY, metadata);
+        }
         pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
         Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
         Exception e = expectThrows(
@@ -168,6 +194,9 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Object> pipelineConfig = new HashMap<>();
         pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
         pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
+        if (metadata != null) {
+            pipelineConfig.put(Pipeline.META_KEY, metadata);
+        }
         pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
         Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
         Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService);
@@ -183,8 +212,31 @@ public class PipelineFactoryTests extends ESTestCase {
         CompoundProcessor processor1 = new CompoundProcessor(testProcessor, testProcessor);
         CompoundProcessor processor2 =
                 new CompoundProcessor(false, Collections.singletonList(testProcessor), Collections.singletonList(testProcessor));
-        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2));
         List<Processor> flattened = pipeline.flattenAllProcessors();
         assertThat(flattened.size(), equalTo(4));
     }
+
+    private Map<String, Object> randomMapOfMaps() {
+        if (randomBoolean()) {
+            return randomNonNullMapOfMaps(10);
+        } else {
+            return null;
+        }
+    }
+
+    private Map<String, Object> randomNonNullMapOfMaps(int maxDepth) {
+        return randomMap(0, randomIntBetween(1, 5), () -> randomNonNullMapOfMapsSupplier(maxDepth));
+    }
+
+    private Tuple<String, Object> randomNonNullMapOfMapsSupplier(int maxDepth) {
+        String key = randomAlphaOfLength(randomIntBetween(2, 15));
+        Object value;
+        if (maxDepth == 0 || randomBoolean()) {
+            value = randomAlphaOfLength(randomIntBetween(1, 10));
+        } else {
+            value = randomNonNullMapOfMaps(maxDepth - 1);
+        }
+        return Tuple.tuple(key, value);
+    }
 }

+ 8 - 8
server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java

@@ -36,7 +36,7 @@ public class PipelineProcessorTests extends ESTestCase {
         CompletableFuture<IngestDocument> invoked = new CompletableFuture<>();
         IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
         Pipeline pipeline = new Pipeline(
-            pipelineId, null, null,
+            pipelineId, null, null, null,
             new CompoundProcessor(new Processor() {
                 @Override
                 public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
@@ -91,13 +91,13 @@ public class PipelineProcessorTests extends ESTestCase {
         outerConfig.put("name", innerPipelineId);
         PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
         Pipeline outer = new Pipeline(
-            outerPipelineId, null, null,
+            outerPipelineId, null, null, null,
             new CompoundProcessor(factory.create(Collections.emptyMap(), null, null, outerConfig))
         );
         Map<String, Object> innerConfig = new HashMap<>();
         innerConfig.put("name", outerPipelineId);
         Pipeline inner = new Pipeline(
-            innerPipelineId, null, null,
+            innerPipelineId, null, null, null,
             new CompoundProcessor(factory.create(Collections.emptyMap(), null, null, innerConfig))
         );
         when(ingestService.getPipeline(outerPipelineId)).thenReturn(outer);
@@ -119,7 +119,7 @@ public class PipelineProcessorTests extends ESTestCase {
         outerConfig.put("name", innerPipelineId);
         PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
         Pipeline inner = new Pipeline(
-            innerPipelineId, null, null, new CompoundProcessor()
+            innerPipelineId, null, null, null, new CompoundProcessor()
         );
         when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner);
         Processor outerProc = factory.create(Collections.emptyMap(), null, null, outerConfig);
@@ -145,14 +145,14 @@ public class PipelineProcessorTests extends ESTestCase {
         LongSupplier relativeTimeProvider = mock(LongSupplier.class);
         when(relativeTimeProvider.getAsLong()).thenReturn(0L);
         Pipeline pipeline1 = new Pipeline(
-            pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), relativeTimeProvider
+            pipeline1Id, null, null, null, new CompoundProcessor(pipeline1Processor), relativeTimeProvider
         );
 
         String key1 = randomAlphaOfLength(10);
         relativeTimeProvider = mock(LongSupplier.class);
         when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(3));
         Pipeline pipeline2 = new Pipeline(
-            pipeline2Id, null, null, new CompoundProcessor(true,
+            pipeline2Id, null, null, null, new CompoundProcessor(true,
             Arrays.asList(
                 new TestProcessor(ingestDocument -> {
                     ingestDocument.setFieldValue(key1, randomInt());
@@ -164,7 +164,7 @@ public class PipelineProcessorTests extends ESTestCase {
         relativeTimeProvider = mock(LongSupplier.class);
         when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2));
         Pipeline pipeline3 = new Pipeline(
-            pipeline3Id, null, null, new CompoundProcessor(
+            pipeline3Id, null, null, null, new CompoundProcessor(
             new TestProcessor(ingestDocument -> {
                 throw new RuntimeException("error");
             })), relativeTimeProvider
@@ -231,7 +231,7 @@ public class PipelineProcessorTests extends ESTestCase {
             }
 
 
-            Pipeline pipeline = new Pipeline(pipelineId, null, null, new CompoundProcessor(false, processors, List.of()));
+            Pipeline pipeline = new Pipeline(pipelineId, null, null, null, new CompoundProcessor(false, processors, List.of()));
             when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
             if (firstPipeline == null) {
                 firstPipeline = pipeline;

+ 10 - 10
server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java

@@ -251,7 +251,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         String key3 = randomAlphaOfLength(10);
 
         Pipeline pipeline = new Pipeline(
-            pipelineId, null, null, new CompoundProcessor(
+            pipelineId, null, null, null, new CompoundProcessor(
             new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }),
             new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); }),
             new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); }))
@@ -313,7 +313,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         );
 
         Pipeline pipeline1 = new Pipeline(
-            pipelineId1, null, null, new CompoundProcessor(
+            pipelineId1, null, null, null, new CompoundProcessor(
             new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }),
             new ConditionalProcessor(
                 randomAlphaOfLength(10),
@@ -325,7 +325,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         );
 
         Pipeline pipeline2 = new Pipeline(
-            pipelineId2, null, null, new CompoundProcessor(
+            pipelineId2, null, null, null, new CompoundProcessor(
                 new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); })));
 
         when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1);
@@ -393,7 +393,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         );
 
         Pipeline pipeline1 = new Pipeline(
-            pipelineId1, null, null, new CompoundProcessor(
+            pipelineId1, null, null, null, new CompoundProcessor(
             new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }),
             new ConditionalProcessor(
                 randomAlphaOfLength(10),
@@ -405,7 +405,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         );
 
         Pipeline pipeline2 = new Pipeline(
-            pipelineId2, null, null, new CompoundProcessor(
+            pipelineId2, null, null, null, new CompoundProcessor(
             new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); })));
 
         when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1);
@@ -456,7 +456,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         String key3 = randomAlphaOfLength(10);
 
         Pipeline pipeline = new Pipeline(
-            pipelineId, null, null, new CompoundProcessor(
+            pipelineId, null, null, null, new CompoundProcessor(
             new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); }),
             new CompoundProcessor(
                 false,
@@ -512,7 +512,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         IllegalStateException exception = new IllegalStateException("Not a pipeline cycle error");
 
         Pipeline pipeline = new Pipeline(
-            pipelineId, null, null, new CompoundProcessor(
+            pipelineId, null, null, null, new CompoundProcessor(
             new TestProcessor(ingestDocument -> ingestDocument.setFieldValue(key1, randomInt())),
             new TestProcessor(ingestDocument -> { throw exception; }))
         );
@@ -551,10 +551,10 @@ public class TrackingResultProcessorTests extends ESTestCase {
         PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
 
         Pipeline pipeline1 = new Pipeline(
-            pipelineId1, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, null, pipelineConfig2)));
+            pipelineId1, null, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, null, pipelineConfig2)));
 
         Pipeline pipeline2 = new Pipeline(
-            pipelineId2, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, null, pipelineConfig1)));
+            pipelineId2, null, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, null, pipelineConfig1)));
 
         when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1);
         when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2);
@@ -581,7 +581,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
         String key1 = randomAlphaOfLength(10);
         PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, null, pipelineConfig);
         Pipeline pipeline = new Pipeline(
-            pipelineId, null, null, new CompoundProcessor(
+            pipelineId, null, null, null, new CompoundProcessor(
             new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); }))
         );
         when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);