Browse Source

Add reroute processor (#76511)

Felix Barnsteiner 2 years ago
parent
commit
11b598a519

+ 5 - 0
docs/changelog/76511.yaml

@@ -0,0 +1,5 @@
+pr: 76511
+summary: Add `reroute` processor
+area: Ingest Node
+type: enhancement
+issues: []

+ 1 - 0
docs/reference/ingest/processors.asciidoc

@@ -64,6 +64,7 @@ include::processors/redact.asciidoc[]
 include::processors/registered-domain.asciidoc[]
 include::processors/remove.asciidoc[]
 include::processors/rename.asciidoc[]
+include::processors/reroute.asciidoc[]
 include::processors/script.asciidoc[]
 include::processors/set.asciidoc[]
 include::processors/set-security-user.asciidoc[]

+ 94 - 0
docs/reference/ingest/processors/reroute.asciidoc

@@ -0,0 +1,94 @@
+[[reroute-processor]]
+=== Reroute processor
+++++
+<titleabbrev>Reroute</titleabbrev>
+++++
+
+experimental::[]
+
+The `reroute` processor allows to route a document to another target index or data stream.
+It has two main modes:
+
+When setting the `destination` option, the target is explicitly specified and the `dataset` and `namespace` options can't be set.
+
+When the `destination` option is not set, this processor is in a data stream mode.
+Note that in this mode, the `reroute` processor can only be used on data streams that follow the {fleet-guide}/data-streams.html#data-streams-naming-scheme[data stream naming scheme].
+Trying to use this processor on a data stream with a non-compliant name will raise an exception.
+
+The name of a data stream consists of three parts: `<type>-<dataset>-<namespace>`.
+See the {fleet-guide}/data-streams.html#data-streams-naming-scheme[data stream naming scheme] documentation for more details.
+
+This processor can use both static values or reference fields from the document to determine the `dataset` and `namespace` components of the new target.
+See <<reroute-options>> for more details.
+
+NOTE: It's not possible to change the `type` of the data stream with the `reroute` processor.
+
+After a `reroute` processor has been executed, all the other processors of the current pipeline are skipped, including the final pipeline.
+If the current pipeline is executed in the context of a <<pipeline-processor>>, the calling pipeline will be skipped, too.
+This means that at most one `reroute` processor is ever executed within a pipeline,
+allowing to define mutually exclusive routing conditions,
+similar to a if, else-if, else-if, … condition.
+
+The reroute processor ensures that the `data_stream.<type|dataset|namespace>` fields are set according to the new target.
+If the document contains a `event.dataset` value, it will be updated to reflect the same value as `data_stream.dataset`.
+
+Note that the client needs to have permissions to the final target.
+Otherwise, the document will be rejected with a security exception which looks like this:
+
+[source,js]
+--------------------------------------------------
+{"type":"security_exception","reason":"action [indices:admin/auto_create] is unauthorized for API key id [8-dt9H8BqGblnY2uSI--] of user [elastic/fleet-server] on indices [logs-foo-default], this action is granted by the index privileges [auto_configure,create_index,manage,all]"}
+--------------------------------------------------
+// NOTCONSOLE
+
+[[reroute-options]]
+.Reroute options
+[options="header"]
+|======
+| Name          | Required  | Default                      | Description
+| `destination` | no        | -                            | A static value for the target. Can't be set when the `dataset` or `namespace` option is set.
+| `dataset`     | no        | `{{data_stream.dataset}}`   a| Field references or a static value for the dataset part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`.
+
+Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<dataset>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
+| `namespace`   | no        | `{{data_stream.namespace}}` a| Field references or a static value for the namespace part of the data stream name. See the criteria for <<indices-create-api-path-params, index names>> for allowed characters. Must be no longer than 100 characters.
+
+Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<namespace>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
+include::common-options.asciidoc[]
+|======
+
+The `if` option can be used to define the condition in which the document should be rerouted to a new target.
+
+[source,js]
+--------------------------------------------------
+{
+  "reroute": {
+    "tag": "nginx",
+    "if" : "ctx?.log?.file?.path?.contains('nginx')",
+    "dataset": "nginx"
+  }
+}
+--------------------------------------------------
+// NOTCONSOLE
+
+The dataset and namespace options can contain either a single value or a list of values that are used as a fallback.
+If a field reference evaluates to `null`, is not present in the document, the next value or field reference is used.
+If a field reference evaluates to a non-`String` value, the processor fails.
+
+In the following example, the processor would first try to resolve the value for the `service.name` field to determine the value for `dataset`.
+If that field resolves to `null`, is missing, or is a non-string value, it would try the next element in the list.
+In this case, this is the static value `"generic`".
+The `namespace` option is configured with just a single static value.
+
+[source,js]
+--------------------------------------------------
+{
+  "reroute": {
+    "dataset": [
+        "{{service.name}}",
+        "generic"
+    ],
+    "namespace": "default"
+  }
+}
+--------------------------------------------------
+// NOTCONSOLE

+ 1 - 0
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

@@ -79,6 +79,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
             entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()),
             entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)),
             entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)),
+            entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()),
             entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)),
             entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)),
             entry(SortProcessor.TYPE, new SortProcessor.Factory()),

+ 262 - 0
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java

@@ -0,0 +1,262 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.ingest.AbstractProcessor;
+import org.elasticsearch.ingest.ConfigurationUtils;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import static org.elasticsearch.core.Strings.format;
+import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
+import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.DATASET_VALUE_SOURCE;
+import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.NAMESPACE_VALUE_SOURCE;
+
+public final class RerouteProcessor extends AbstractProcessor {
+
+    public static final String TYPE = "reroute";
+
+    private static final String NAMING_SCHEME_ERROR_MESSAGE =
+        "invalid data stream name: [%s]; must follow naming scheme <type>-<dataset>-<namespace>";
+
+    private static final String DATA_STREAM_PREFIX = "data_stream.";
+    private static final String DATA_STREAM_TYPE = DATA_STREAM_PREFIX + "type";
+    private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset";
+    private static final String DATA_STREAM_NAMESPACE = DATA_STREAM_PREFIX + "namespace";
+    private static final String EVENT_DATASET = "event.dataset";
+    private final List<DataStreamValueSource> dataset;
+    private final List<DataStreamValueSource> namespace;
+    private final String destination;
+
+    RerouteProcessor(
+        String tag,
+        String description,
+        List<DataStreamValueSource> dataset,
+        List<DataStreamValueSource> namespace,
+        String destination
+    ) {
+        super(tag, description);
+        if (dataset.isEmpty()) {
+            this.dataset = List.of(DATASET_VALUE_SOURCE);
+        } else {
+            this.dataset = dataset;
+        }
+        if (namespace.isEmpty()) {
+            this.namespace = List.of(NAMESPACE_VALUE_SOURCE);
+        } else {
+            this.namespace = namespace;
+        }
+        this.destination = destination;
+    }
+
+    @Override
+    public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+        if (destination != null) {
+            ingestDocument.reroute(destination);
+            return ingestDocument;
+        }
+        final String indexName = ingestDocument.getFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), String.class);
+        final String type;
+        final String currentDataset;
+        final String currentNamespace;
+
+        // parse out the <type>-<dataset>-<namespace> components from _index
+        int indexOfFirstDash = indexName.indexOf('-');
+        if (indexOfFirstDash < 0) {
+            throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
+        }
+        int indexOfSecondDash = indexName.indexOf('-', indexOfFirstDash + 1);
+        if (indexOfSecondDash < 0) {
+            throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
+        }
+        type = parseDataStreamType(indexName, indexOfFirstDash);
+        currentDataset = parseDataStreamDataset(indexName, indexOfFirstDash, indexOfSecondDash);
+        currentNamespace = parseDataStreamNamespace(indexName, indexOfSecondDash);
+
+        String dataset = determineDataStreamField(ingestDocument, this.dataset, currentDataset);
+        String namespace = determineDataStreamField(ingestDocument, this.namespace, currentNamespace);
+        String newTarget = type + "-" + dataset + "-" + namespace;
+        ingestDocument.reroute(newTarget);
+        ingestDocument.setFieldValue(DATA_STREAM_TYPE, type);
+        ingestDocument.setFieldValue(DATA_STREAM_DATASET, dataset);
+        ingestDocument.setFieldValue(DATA_STREAM_NAMESPACE, namespace);
+        if (ingestDocument.hasField(EVENT_DATASET)) {
+            // ECS specifies that "event.dataset should have the same value as data_stream.dataset"
+            // not eagerly set event.dataset but only if the doc contains it already to ensure it's consistent with data_stream.dataset
+            ingestDocument.setFieldValue(EVENT_DATASET, dataset);
+        }
+        return ingestDocument;
+    }
+
+    private static String parseDataStreamType(String dataStreamName, int indexOfFirstDash) {
+        return dataStreamName.substring(0, indexOfFirstDash);
+    }
+
+    private static String parseDataStreamDataset(String dataStreamName, int indexOfFirstDash, int indexOfSecondDash) {
+        return dataStreamName.substring(indexOfFirstDash + 1, indexOfSecondDash);
+    }
+
+    private static String parseDataStreamNamespace(String dataStreamName, int indexOfSecondDash) {
+        return dataStreamName.substring(indexOfSecondDash + 1);
+    }
+
+    private String determineDataStreamField(
+        IngestDocument ingestDocument,
+        List<DataStreamValueSource> valueSources,
+        String fallbackFromCurrentTarget
+    ) {
+        // first try to get value from the configured dataset/namespace field references
+        // if this contains a static value rather than a field reference, this is guaranteed to return
+        for (DataStreamValueSource value : valueSources) {
+            String result = value.resolve(ingestDocument);
+            if (result != null) {
+                return result;
+            }
+        }
+        // use the dataset/namespace value we parsed out from the current target (_index) as a fallback
+        return fallbackFromCurrentTarget;
+    }
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+    List<DataStreamValueSource> getDataStreamDataset() {
+        return dataset;
+    }
+
+    List<DataStreamValueSource> getDataStreamNamespace() {
+        return namespace;
+    }
+
+    String getDestination() {
+        return destination;
+    }
+
+    public static final class Factory implements Processor.Factory {
+
+        @Override
+        public RerouteProcessor create(
+            Map<String, Processor.Factory> processorFactories,
+            String tag,
+            String description,
+            Map<String, Object> config
+        ) throws Exception {
+            List<DataStreamValueSource> dataset;
+            try {
+                dataset = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "dataset")
+                    .stream()
+                    .map(DataStreamValueSource::dataset)
+                    .toList();
+            } catch (IllegalArgumentException e) {
+                throw newConfigurationException(TYPE, tag, "dataset", e.getMessage());
+            }
+            List<DataStreamValueSource> namespace;
+            try {
+                namespace = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "namespace")
+                    .stream()
+                    .map(DataStreamValueSource::namespace)
+                    .toList();
+            } catch (IllegalArgumentException e) {
+                throw newConfigurationException(TYPE, tag, "namespace", e.getMessage());
+            }
+
+            String destination = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "destination");
+            if (destination != null && (dataset.isEmpty() == false || namespace.isEmpty() == false)) {
+                throw newConfigurationException(TYPE, tag, "destination", "can only be set if dataset and namespace are not set");
+            }
+
+            return new RerouteProcessor(tag, description, dataset, namespace, destination);
+        }
+    }
+
+    /**
+     * Contains either a {{field reference}} or a static value for a dataset or a namespace field
+     */
+    static final class DataStreamValueSource {
+
+        private static final int MAX_LENGTH = 100;
+        private static final String REPLACEMENT = "_";
+        private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
+        private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]");
+        static final DataStreamValueSource DATASET_VALUE_SOURCE = dataset("{{" + DATA_STREAM_DATASET + "}}");
+        static final DataStreamValueSource NAMESPACE_VALUE_SOURCE = namespace("{{" + DATA_STREAM_NAMESPACE + "}}");
+
+        private final String value;
+        private final String fieldReference;
+        private final Function<String, String> sanitizer;
+
+        public static DataStreamValueSource dataset(String dataset) {
+            return new DataStreamValueSource(dataset, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_DATASET));
+        }
+
+        public static DataStreamValueSource namespace(String namespace) {
+            return new DataStreamValueSource(namespace, nsp -> sanitizeDataStreamField(nsp, DISALLOWED_IN_NAMESPACE));
+        }
+
+        private static String sanitizeDataStreamField(String s, Pattern disallowedInDataset) {
+            if (s == null) {
+                return null;
+            }
+            s = s.toLowerCase(Locale.ROOT);
+            s = s.substring(0, Math.min(s.length(), MAX_LENGTH));
+            return disallowedInDataset.matcher(s).replaceAll(REPLACEMENT);
+        }
+
+        private DataStreamValueSource(String value, Function<String, String> sanitizer) {
+            this.sanitizer = sanitizer;
+            this.value = value;
+            if (value.contains("{{") || value.contains("}}")) {
+                if (value.startsWith("{{") == false || value.endsWith("}}") == false) {
+                    throw new IllegalArgumentException("'" + value + "' is not a valid field reference");
+                }
+                String fieldReference = value.substring(2, value.length() - 2);
+                // field references may have two or three curly braces
+                if (fieldReference.startsWith("{") && fieldReference.endsWith("}")) {
+                    fieldReference = fieldReference.substring(1, fieldReference.length() - 1);
+                }
+                // only a single field reference is allowed
+                // so something like this is disallowed: {{foo}}-{{bar}}
+                if (fieldReference.contains("{") || fieldReference.contains("}")) {
+                    throw new IllegalArgumentException("'" + value + "' is not a valid field reference");
+                }
+                this.fieldReference = fieldReference;
+            } else {
+                this.fieldReference = null;
+                if (Objects.equals(sanitizer.apply(value), value) == false) {
+                    throw new IllegalArgumentException("'" + value + "' contains disallowed characters");
+                }
+            }
+        }
+
+        /**
+         * Resolves the field reference from the provided ingest document or returns the static value if this value source doesn't represent
+         * a field reference.
+         * @param ingestDocument
+         * @return the resolved field reference or static value
+         */
+        @Nullable
+        public String resolve(IngestDocument ingestDocument) {
+            if (fieldReference != null) {
+                return sanitizer.apply(ingestDocument.getFieldValue(fieldReference, String.class, true));
+            } else {
+                return value;
+            }
+        }
+    }
+}

+ 78 - 0
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java

@@ -0,0 +1,78 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class RerouteProcessorFactoryTests extends ESTestCase {
+
+    public void testDefaults() throws Exception {
+        RerouteProcessor processor = create(null, null);
+        assertThat(processor.getDataStreamDataset(), equalTo(List.of(DataStreamValueSource.DATASET_VALUE_SOURCE)));
+        assertThat(processor.getDataStreamNamespace(), equalTo(List.of(DataStreamValueSource.NAMESPACE_VALUE_SOURCE)));
+    }
+
+    public void testInvalidDataset() throws Exception {
+        ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("my-service", null));
+        assertThat(e.getMessage(), equalTo("[dataset] 'my-service' contains disallowed characters"));
+    }
+
+    public void testInvalidNamespace() throws Exception {
+        ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("generic", "foo:bar"));
+        assertThat(e.getMessage(), equalTo("[namespace] 'foo:bar' contains disallowed characters"));
+    }
+
+    public void testDestinationSuccess() throws Exception {
+        RerouteProcessor processor = create(Map.of("destination", "foo"));
+        assertThat(processor.getDestination(), equalTo("foo"));
+    }
+
+    public void testDestinationAndDataset() {
+        ElasticsearchParseException e = expectThrows(
+            ElasticsearchParseException.class,
+            () -> create(Map.of("destination", "foo", "dataset", "bar"))
+        );
+        assertThat(e.getMessage(), equalTo("[destination] can only be set if dataset and namespace are not set"));
+    }
+
+    public void testFieldReference() throws Exception {
+        create("{{foo}}", "{{{bar}}}");
+    }
+
+    public void testInvalidFieldReference() throws Exception {
+        ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("{{foo}}-{{bar}}", "foo"));
+        assertThat(e.getMessage(), equalTo("[dataset] '{{foo}}-{{bar}}' is not a valid field reference"));
+
+        e = expectThrows(ElasticsearchParseException.class, () -> create("{{{{foo}}}}", "foo"));
+        assertThat(e.getMessage(), equalTo("[dataset] '{{{{foo}}}}' is not a valid field reference"));
+    }
+
+    private static RerouteProcessor create(String dataset, String namespace) throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        if (dataset != null) {
+            config.put("dataset", dataset);
+        }
+        if (namespace != null) {
+            config.put("namespace", namespace);
+        }
+        return create(config);
+    }
+
+    private static RerouteProcessor create(Map<String, Object> config) throws Exception {
+        return new RerouteProcessor.Factory().create(null, null, null, new HashMap<>(config));
+    }
+}

+ 303 - 0
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java

@@ -0,0 +1,303 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.ingest.CompoundProcessor;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.ingest.RandomDocumentPicks;
+import org.elasticsearch.ingest.TestProcessor;
+import org.elasticsearch.ingest.WrappingProcessor;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class RerouteProcessorTests extends ESTestCase {
+
+    public void testDefaults() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "generic", "default");
+    }
+
+    public void testEventDataset() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+        ingestDocument.setFieldValue("event.dataset", "foo");
+
+        RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset}}"), List.of());
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "foo", "default");
+        assertThat(ingestDocument.getFieldValue("event.dataset", String.class), equalTo("foo"));
+    }
+
+    public void testNoDataset() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+        ingestDocument.setFieldValue("ds", "foo");
+
+        RerouteProcessor processor = createRerouteProcessor(List.of("{{ds}}"), List.of());
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "foo", "default");
+        assertFalse(ingestDocument.hasField("event.dataset"));
+    }
+
+    public void testSkipFirstProcessor() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+        RerouteProcessor skippedProcessor = createRerouteProcessor(List.of("skip"), List.of());
+        RerouteProcessor executedProcessor = createRerouteProcessor(List.of("executed"), List.of());
+        CompoundProcessor processor = new CompoundProcessor(new SkipProcessor(skippedProcessor), executedProcessor);
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "executed", "default");
+    }
+
+    public void testSkipLastProcessor() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+        RerouteProcessor executedProcessor = createRerouteProcessor(List.of("executed"), List.of());
+        RerouteProcessor skippedProcessor = createRerouteProcessor(List.of("skip"), List.of());
+        CompoundProcessor processor = new CompoundProcessor(executedProcessor, skippedProcessor);
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "executed", "default");
+    }
+
+    public void testDataStreamFieldsFromDocument() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+        ingestDocument.setFieldValue("data_stream.dataset", "foo");
+        ingestDocument.setFieldValue("data_stream.namespace", "bar");
+
+        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "foo", "bar");
+    }
+
+    public void testInvalidDataStreamFieldsFromDocument() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+        ingestDocument.setFieldValue("data_stream.dataset", "foo-bar");
+        ingestDocument.setFieldValue("data_stream.namespace", "baz#qux");
+
+        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "foo_bar", "baz_qux");
+    }
+
+    public void testDestination() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+        RerouteProcessor processor = createRerouteProcessor("foo");
+        processor.execute(ingestDocument);
+        assertFalse(ingestDocument.hasField("data_stream"));
+        assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("foo"));
+    }
+
+    public void testFieldReference() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+        ingestDocument.setFieldValue("service.name", "opbeans-java");
+        ingestDocument.setFieldValue("service.environment", "dev");
+
+        RerouteProcessor processor = createRerouteProcessor(List.of("{{service.name}}"), List.of("{{service.environment}}"));
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "opbeans_java", "dev");
+    }
+
+    public void testRerouteToCurrentTarget() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+        RerouteProcessor reroute = createRerouteProcessor(List.of("generic"), List.of("default"));
+        CompoundProcessor processor = new CompoundProcessor(
+            reroute,
+            new TestProcessor(doc -> doc.setFieldValue("pipeline_is_continued", true))
+        );
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "generic", "default");
+        assertFalse(ingestDocument.hasField("pipeline_is_continued"));
+    }
+
+    public void testFieldReferenceWithMissingReroutesToCurrentTarget() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+        RerouteProcessor reroute = createRerouteProcessor(List.of("{{service.name}}"), List.of("{{service.environment}}"));
+        CompoundProcessor processor = new CompoundProcessor(
+            reroute,
+            new TestProcessor(doc -> doc.setFieldValue("pipeline_is_continued", true))
+        );
+        processor.execute(ingestDocument);
+        assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("logs-generic-default"));
+        assertDataSetFields(ingestDocument, "logs", "generic", "default");
+        assertFalse(ingestDocument.hasField("pipeline_is_continued"));
+    }
+
+    public void testDataStreamFieldReference() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+        ingestDocument.setFieldValue("data_stream.dataset", "dataset_from_doc");
+        ingestDocument.setFieldValue("data_stream.namespace", "namespace_from_doc");
+
+        RerouteProcessor processor = createRerouteProcessor(
+            List.of("{{{data_stream.dataset}}}", "fallback"),
+            List.of("{{data_stream.namespace}}", "fallback")
+        );
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "dataset_from_doc", "namespace_from_doc");
+    }
+
+    public void testDatasetFieldReferenceMissingValue() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+
+        RerouteProcessor processor = createRerouteProcessor(
+            List.of("{{data_stream.dataset}}", "fallback"),
+            List.of("{{data_stream.namespace}}", "fallback")
+        );
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "fallback", "fallback");
+    }
+
+    public void testDatasetFieldReference() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+        ingestDocument.setFieldValue("data_stream.dataset", "generic");
+        ingestDocument.setFieldValue("data_stream.namespace", "default");
+
+        RerouteProcessor processor = createRerouteProcessor(
+            List.of("{{data_stream.dataset}}", "fallback"),
+            List.of("{{{data_stream.namespace}}}", "fallback")
+        );
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "generic", "default");
+    }
+
+    public void testFallbackToValuesFrom_index() throws Exception {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+        ingestDocument.setFieldValue("data_stream.dataset", "foo");
+        ingestDocument.setFieldValue("data_stream.namespace", "bar");
+
+        RerouteProcessor processor = createRerouteProcessor(List.of("{{foo}}"), List.of("{{bar}}"));
+        processor.execute(ingestDocument);
+        assertDataSetFields(ingestDocument, "logs", "generic", "default");
+    }
+
+    public void testInvalidDataStreamName() throws Exception {
+        {
+            IngestDocument ingestDocument = createIngestDocument("foo");
+            RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
+            IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
+            assertThat(e.getMessage(), equalTo("invalid data stream name: [foo]; must follow naming scheme <type>-<dataset>-<namespace>"));
+        }
+
+        {
+            // naturally, though, a plain destination doesn't have to match the data stream naming convention
+            IngestDocument ingestDocument = createIngestDocument("foo");
+            RerouteProcessor processor = createRerouteProcessor("bar");
+            processor.execute(ingestDocument);
+            assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("bar"));
+        }
+    }
+
+    public void testRouteOnNonStringFieldFails() {
+        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
+        ingestDocument.setFieldValue("numeric_field", 42);
+        RerouteProcessor processor = createRerouteProcessor(List.of("{{numeric_field}}"), List.of());
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
+        assertThat(e.getMessage(), equalTo("field [numeric_field] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
+    }
+
+    public void testDatasetSanitization() {
+        assertDatasetSanitization("\\/*?\"<>| ,#:-", "_____________");
+        assertDatasetSanitization("foo*bar", "foo_bar");
+    }
+
+    public void testNamespaceSanitization() {
+        assertNamespaceSanitization("\\/*?\"<>| ,#:-", "____________-");
+        assertNamespaceSanitization("foo*bar", "foo_bar");
+    }
+
+    private static void assertDatasetSanitization(String dataset, String sanitizedDataset) {
+        assertThat(
+            RerouteProcessor.DataStreamValueSource.dataset("{{foo}}")
+                .resolve(RandomDocumentPicks.randomIngestDocument(random(), Map.of("foo", dataset))),
+            equalTo(sanitizedDataset)
+        );
+    }
+
+    private static void assertNamespaceSanitization(String namespace, String sanitizedNamespace) {
+        assertThat(
+            RerouteProcessor.DataStreamValueSource.namespace("{{foo}}")
+                .resolve(RandomDocumentPicks.randomIngestDocument(random(), Map.of("foo", namespace))),
+            equalTo(sanitizedNamespace)
+        );
+    }
+
+    private RerouteProcessor createRerouteProcessor(List<String> dataset, List<String> namespace) {
+        return new RerouteProcessor(
+            null,
+            null,
+            dataset.stream().map(RerouteProcessor.DataStreamValueSource::dataset).toList(),
+            namespace.stream().map(RerouteProcessor.DataStreamValueSource::namespace).toList(),
+            null
+        );
+    }
+
+    private RerouteProcessor createRerouteProcessor(String destination) {
+        return new RerouteProcessor(null, null, List.of(), List.of(), destination);
+    }
+
+    private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) {
+        assertThat(ingestDocument.getFieldValue("data_stream.type", String.class), equalTo(type));
+        assertThat(ingestDocument.getFieldValue("data_stream.dataset", String.class), equalTo(dataset));
+        assertThat(ingestDocument.getFieldValue("data_stream.namespace", String.class), equalTo(namespace));
+        assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo(type + "-" + dataset + "-" + namespace));
+        if (ingestDocument.hasField("event.dataset")) {
+            assertThat(
+                ingestDocument.getFieldValue("event.dataset", String.class),
+                equalTo(ingestDocument.getFieldValue("data_stream.dataset", String.class))
+            );
+        }
+    }
+
+    private static IngestDocument createIngestDocument(String dataStream) {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+        ingestDocument.setFieldValue("_index", dataStream);
+        return ingestDocument;
+    }
+
+    private static class SkipProcessor implements WrappingProcessor {
+        private final Processor processor;
+
+        SkipProcessor(Processor processor) {
+            this.processor = processor;
+        }
+
+        @Override
+        public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+            return ingestDocument;
+        }
+
+        @Override
+        public Processor getInnerProcessor() {
+            return processor;
+        }
+
+        @Override
+        public String getType() {
+            return "skip";
+        }
+
+        @Override
+        public String getTag() {
+            return null;
+        }
+
+        @Override
+        public String getDescription() {
+            return null;
+        }
+    }
+}

+ 140 - 0
modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml

@@ -0,0 +1,140 @@
+---
+teardown:
+  - do:
+      ingest.delete_pipeline:
+        id: "pipeline-with-two-data-stream-processors"
+        ignore: 404
+  - do:
+      ingest.delete_pipeline:
+        id: "logs-router-default"
+        ignore: 404
+  - do:
+      ingest.delete_pipeline:
+        id: "logs-nginx-default"
+        ignore: 404
+  - do:
+      indices.delete_index_template:
+        name: logs-router
+        ignore: 404
+  - do:
+      indices.delete_index_template:
+        name: logs-nginx
+        ignore: 404
+
+---
+"Test first matching router terminates pipeline":
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline-with-two-data-stream-processors"
+        body:  >
+          {
+            "processors": [
+              {
+                "reroute" : {
+                  "dataset" : "first"
+                }
+              },
+              {
+                "reroute" : {
+                  "dataset" : "second"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      index:
+        index: logs-generic-default
+        id: "1"
+        pipeline: "pipeline-with-two-data-stream-processors"
+        body: {
+          foo: "bar"
+        }
+
+  - do:
+      get:
+        index: logs-first-default
+        id: "1"
+  - match: { _source.foo: "bar" }
+---
+"Test two stage routing":
+  - skip:
+      features: allowed_warnings
+  - do:
+      ingest.put_pipeline:
+        id: "logs-router"
+        body:  >
+          {
+            "processors": [
+              {
+                "reroute" : {
+                  "tag": "nginx",
+                  "if" : "ctx?.log?.file?.path?.contains('nginx')",
+                  "dataset": "nginx"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+  - do:
+      allowed_warnings:
+        - "index template [logs-router] has index patterns [logs-router-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [logs-router] will take precedence during new index creation"
+      indices.put_index_template:
+        name: logs-router
+        body:
+          index_patterns: [ "logs-router-*" ]
+          template:
+            settings:
+              index.default_pipeline: "logs-router"
+  - do:
+      ingest.put_pipeline:
+        id: "logs-nginx"
+        body:  >
+          {
+            "processors": [
+              {
+                "reroute": {
+                  "tag": "nginx.access",
+                  "if": "ctx?.log?.file?.path?.contains('access')",
+                  "dataset": "nginx.access"
+                }
+              },
+              {
+                "reroute": {
+                  "tag": "nginx.error",
+                  "if": "ctx?.log?.file?.path?.contains('error')",
+                  "dataset": "nginx.error"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+  - do:
+      allowed_warnings:
+        - "index template [logs-nginx] has index patterns [logs-nginx-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [logs-nginx] will take precedence during new index creation"
+      indices.put_index_template:
+        name: logs-nginx
+        body:
+          index_patterns: [ "logs-nginx-*" ]
+          template:
+            settings:
+              index.default_pipeline: "logs-nginx"
+
+  - do:
+      index:
+        index: logs-nginx-default
+        id: "example-log"
+        op_type: create
+        body:
+          "@timestamp": "2022-04-13"
+          message: "this is an error log"
+          log:
+            file:
+              path: "nginx-error.log"
+
+  - do:
+      get:
+        index: logs-nginx.error-default
+        id: "example-log"
+  - match: { _source.message: "this is an error log" }

+ 21 - 0
server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java

@@ -318,6 +318,27 @@ public final class ConfigurationUtils {
         return readList(processorType, processorTag, propertyName, value);
     }
 
+    /**
+     * Returns and removes the specified property of type list from the specified configuration map.
+     *
+     * If the property value isn't of type list or string an {@link ElasticsearchParseException} is thrown.
+     */
+    public static List<String> readOptionalListOrString(
+        String processorType,
+        String processorTag,
+        Map<String, Object> configuration,
+        String propertyName
+    ) {
+        Object value = configuration.remove(propertyName);
+        if (value == null) {
+            return List.of();
+        }
+        if (value instanceof String) {
+            return List.of(readString(processorType, processorTag, propertyName, value));
+        }
+        return readList(processorType, processorTag, propertyName, value);
+    }
+
     /**
      * Returns and removes the specified property of type list from the specified configuration map.
      *