|  | @@ -27,16 +27,25 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |      public void testDefaults() throws Exception {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of(), List.of());
 | 
	
		
			
				|  |  |          processor.execute(ingestDocument);
 | 
	
		
			
				|  |  |          assertDataSetFields(ingestDocument, "logs", "generic", "default");
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    public void testRouteOnType() throws Exception {
 | 
	
		
			
				|  |  | +        IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  | +        ingestDocument.setFieldValue("event.type", "foo");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        RerouteProcessor processor = createRerouteProcessor(List.of("{{event.type}}"), List.of(), List.of());
 | 
	
		
			
				|  |  | +        processor.execute(ingestDocument);
 | 
	
		
			
				|  |  | +        assertDataSetFields(ingestDocument, "foo", "generic", "default");
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      public void testEventDataset() throws Exception {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("event.dataset", "foo");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset }}"), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{event.dataset }}"), List.of());
 | 
	
		
			
				|  |  |          processor.execute(ingestDocument);
 | 
	
		
			
				|  |  |          assertDataSetFields(ingestDocument, "logs", "foo", "default");
 | 
	
		
			
				|  |  |          assertThat(ingestDocument.getFieldValue("event.dataset", String.class), equalTo("foo"));
 | 
	
	
		
			
				|  | @@ -46,7 +55,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  |          ingestDocument.getCtxMap().put("event.dataset", "foo");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor processor = createRerouteProcessor(List.of("{{ event.dataset}}"), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{ event.dataset}}"), List.of());
 | 
	
		
			
				|  |  |          processor.execute(ingestDocument);
 | 
	
		
			
				|  |  |          assertDataSetFields(ingestDocument, "logs", "foo", "default");
 | 
	
		
			
				|  |  |          assertThat(ingestDocument.getCtxMap().get("event.dataset"), equalTo("foo"));
 | 
	
	
		
			
				|  | @@ -57,7 +66,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("ds", "foo");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor processor = createRerouteProcessor(List.of("{{ ds }}"), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{ ds }}"), List.of());
 | 
	
		
			
				|  |  |          processor.execute(ingestDocument);
 | 
	
		
			
				|  |  |          assertDataSetFields(ingestDocument, "logs", "foo", "default");
 | 
	
		
			
				|  |  |          assertFalse(ingestDocument.hasField("event.dataset"));
 | 
	
	
		
			
				|  | @@ -66,8 +75,8 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |      public void testSkipFirstProcessor() throws Exception {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor skippedProcessor = createRerouteProcessor(List.of("skip"), List.of());
 | 
	
		
			
				|  |  | -        RerouteProcessor executedProcessor = createRerouteProcessor(List.of("executed"), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor skippedProcessor = createRerouteProcessor(List.of(), List.of("skip"), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor executedProcessor = createRerouteProcessor(List.of(), List.of("executed"), List.of());
 | 
	
		
			
				|  |  |          CompoundProcessor processor = new CompoundProcessor(new SkipProcessor(skippedProcessor), executedProcessor);
 | 
	
		
			
				|  |  |          processor.execute(ingestDocument);
 | 
	
		
			
				|  |  |          assertDataSetFields(ingestDocument, "logs", "executed", "default");
 | 
	
	
		
			
				|  | @@ -76,8 +85,8 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |      public void testSkipLastProcessor() throws Exception {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor executedProcessor = createRerouteProcessor(List.of("executed"), List.of());
 | 
	
		
			
				|  |  | -        RerouteProcessor skippedProcessor = createRerouteProcessor(List.of("skip"), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor executedProcessor = createRerouteProcessor(List.of(), List.of("executed"), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor skippedProcessor = createRerouteProcessor(List.of(), List.of("skip"), List.of());
 | 
	
		
			
				|  |  |          CompoundProcessor processor = new CompoundProcessor(executedProcessor, skippedProcessor);
 | 
	
		
			
				|  |  |          processor.execute(ingestDocument);
 | 
	
		
			
				|  |  |          assertDataSetFields(ingestDocument, "logs", "executed", "default");
 | 
	
	
		
			
				|  | @@ -85,23 +94,24 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testDataStreamFieldsFromDocument() throws Exception {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  | +        ingestDocument.setFieldValue("data_stream.type", "eggplant");
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("data_stream.dataset", "foo");
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("data_stream.namespace", "bar");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of(), List.of());
 | 
	
		
			
				|  |  |          processor.execute(ingestDocument);
 | 
	
		
			
				|  |  | -        assertDataSetFields(ingestDocument, "logs", "foo", "bar");
 | 
	
		
			
				|  |  | +        assertDataSetFields(ingestDocument, "eggplant", "foo", "bar");
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testDataStreamFieldsFromDocumentDottedNotation() throws Exception {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  | -        ingestDocument.getCtxMap().put("data_stream.type", "logs");
 | 
	
		
			
				|  |  | +        ingestDocument.getCtxMap().put("data_stream.type", "eggplant");
 | 
	
		
			
				|  |  |          ingestDocument.getCtxMap().put("data_stream.dataset", "foo");
 | 
	
		
			
				|  |  |          ingestDocument.getCtxMap().put("data_stream.namespace", "bar");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of(), List.of());
 | 
	
		
			
				|  |  |          processor.execute(ingestDocument);
 | 
	
		
			
				|  |  | -        assertDataSetFields(ingestDocument, "logs", "foo", "bar");
 | 
	
		
			
				|  |  | +        assertDataSetFields(ingestDocument, "eggplant", "foo", "bar");
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testInvalidDataStreamFieldsFromDocument() throws Exception {
 | 
	
	
		
			
				|  | @@ -109,7 +119,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("data_stream.dataset", "foo-bar");
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("data_stream.namespace", "baz#qux");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of(), List.of());
 | 
	
		
			
				|  |  |          processor.execute(ingestDocument);
 | 
	
		
			
				|  |  |          assertDataSetFields(ingestDocument, "logs", "foo_bar", "baz_qux");
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -128,7 +138,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("service.name", "opbeans-java");
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("service.environment", "dev");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor processor = createRerouteProcessor(List.of("{{service.name}}"), List.of("{{service.environment}}"));
 | 
	
		
			
				|  |  | +        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{service.name}}"), List.of("{{service.environment}}"));
 | 
	
		
			
				|  |  |          processor.execute(ingestDocument);
 | 
	
		
			
				|  |  |          assertDataSetFields(ingestDocument, "logs", "opbeans_java", "dev");
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -136,7 +146,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |      public void testRerouteToCurrentTarget() throws Exception {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor reroute = createRerouteProcessor(List.of("generic"), List.of("default"));
 | 
	
		
			
				|  |  | +        RerouteProcessor reroute = createRerouteProcessor(List.of(), List.of("generic"), List.of("default"));
 | 
	
		
			
				|  |  |          CompoundProcessor processor = new CompoundProcessor(
 | 
	
		
			
				|  |  |              reroute,
 | 
	
		
			
				|  |  |              new TestProcessor(doc -> doc.setFieldValue("pipeline_is_continued", true))
 | 
	
	
		
			
				|  | @@ -149,7 +159,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |      public void testFieldReferenceWithMissingReroutesToCurrentTarget() throws Exception {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor reroute = createRerouteProcessor(List.of("{{service.name}}"), List.of("{{service.environment}}"));
 | 
	
		
			
				|  |  | +        RerouteProcessor reroute = createRerouteProcessor(List.of(), List.of("{{service.name}}"), List.of("{{service.environment}}"));
 | 
	
		
			
				|  |  |          CompoundProcessor processor = new CompoundProcessor(
 | 
	
		
			
				|  |  |              reroute,
 | 
	
		
			
				|  |  |              new TestProcessor(doc -> doc.setFieldValue("pipeline_is_continued", true))
 | 
	
	
		
			
				|  | @@ -166,6 +176,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("data_stream.namespace", "namespace_from_doc");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          RerouteProcessor processor = createRerouteProcessor(
 | 
	
		
			
				|  |  | +            List.of(),
 | 
	
		
			
				|  |  |              List.of("{{{data_stream.dataset}}}", "fallback"),
 | 
	
		
			
				|  |  |              List.of("{{data_stream.namespace}}", "fallback")
 | 
	
		
			
				|  |  |          );
 | 
	
	
		
			
				|  | @@ -177,6 +188,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          RerouteProcessor processor = createRerouteProcessor(
 | 
	
		
			
				|  |  | +            List.of(),
 | 
	
		
			
				|  |  |              List.of("{{data_stream.dataset}}", "fallback"),
 | 
	
		
			
				|  |  |              List.of("{{data_stream.namespace}}", "fallback")
 | 
	
		
			
				|  |  |          );
 | 
	
	
		
			
				|  | @@ -190,6 +202,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("data_stream.namespace", "default");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          RerouteProcessor processor = createRerouteProcessor(
 | 
	
		
			
				|  |  | +            List.of(),
 | 
	
		
			
				|  |  |              List.of("{{data_stream.dataset}}", "fallback"),
 | 
	
		
			
				|  |  |              List.of("{{{data_stream.namespace}}}", "fallback")
 | 
	
		
			
				|  |  |          );
 | 
	
	
		
			
				|  | @@ -202,7 +215,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("data_stream.dataset", "foo");
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("data_stream.namespace", "bar");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        RerouteProcessor processor = createRerouteProcessor(List.of("{{foo}}"), List.of("{{bar}}"));
 | 
	
		
			
				|  |  | +        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{foo}}"), List.of("{{bar}}"));
 | 
	
		
			
				|  |  |          processor.execute(ingestDocument);
 | 
	
		
			
				|  |  |          assertDataSetFields(ingestDocument, "logs", "generic", "default");
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -210,7 +223,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |      public void testInvalidDataStreamName() throws Exception {
 | 
	
		
			
				|  |  |          {
 | 
	
		
			
				|  |  |              IngestDocument ingestDocument = createIngestDocument("foo");
 | 
	
		
			
				|  |  | -            RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
 | 
	
		
			
				|  |  | +            RerouteProcessor processor = createRerouteProcessor(List.of(), List.of(), List.of());
 | 
	
		
			
				|  |  |              IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
 | 
	
		
			
				|  |  |              assertThat(e.getMessage(), equalTo("invalid data stream name: [foo]; must follow naming scheme <type>-<dataset>-<namespace>"));
 | 
	
		
			
				|  |  |          }
 | 
	
	
		
			
				|  | @@ -227,11 +240,16 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |      public void testRouteOnNonStringFieldFails() {
 | 
	
		
			
				|  |  |          IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
 | 
	
		
			
				|  |  |          ingestDocument.setFieldValue("numeric_field", 42);
 | 
	
		
			
				|  |  | -        RerouteProcessor processor = createRerouteProcessor(List.of("{{numeric_field}}"), List.of());
 | 
	
		
			
				|  |  | +        RerouteProcessor processor = createRerouteProcessor(List.of(), List.of("{{numeric_field}}"), List.of());
 | 
	
		
			
				|  |  |          IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
 | 
	
		
			
				|  |  |          assertThat(e.getMessage(), equalTo("field [numeric_field] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    public void testTypeSanitization() {
 | 
	
		
			
				|  |  | +        assertTypeSanitization("\\/*?\"<>| ,#:-", "_____________");
 | 
	
		
			
				|  |  | +        assertTypeSanitization("foo*bar", "foo_bar");
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      public void testDatasetSanitization() {
 | 
	
		
			
				|  |  |          assertDatasetSanitization("\\/*?\"<>| ,#:-", "_____________");
 | 
	
		
			
				|  |  |          assertDatasetSanitization("foo*bar", "foo_bar");
 | 
	
	
		
			
				|  | @@ -242,6 +260,14 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |          assertNamespaceSanitization("foo*bar", "foo_bar");
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    private static void assertTypeSanitization(String type, String sanitizedType) {
 | 
	
		
			
				|  |  | +        assertThat(
 | 
	
		
			
				|  |  | +            RerouteProcessor.DataStreamValueSource.type("{{foo}}")
 | 
	
		
			
				|  |  | +                .resolve(RandomDocumentPicks.randomIngestDocument(random(), Map.of("foo", type))),
 | 
	
		
			
				|  |  | +            equalTo(sanitizedType)
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      private static void assertDatasetSanitization(String dataset, String sanitizedDataset) {
 | 
	
		
			
				|  |  |          assertThat(
 | 
	
		
			
				|  |  |              RerouteProcessor.DataStreamValueSource.dataset("{{foo}}")
 | 
	
	
		
			
				|  | @@ -258,10 +284,11 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    private RerouteProcessor createRerouteProcessor(List<String> dataset, List<String> namespace) {
 | 
	
		
			
				|  |  | +    private RerouteProcessor createRerouteProcessor(List<String> type, List<String> dataset, List<String> namespace) {
 | 
	
		
			
				|  |  |          return new RerouteProcessor(
 | 
	
		
			
				|  |  |              null,
 | 
	
		
			
				|  |  |              null,
 | 
	
		
			
				|  |  | +            type.stream().map(RerouteProcessor.DataStreamValueSource::type).toList(),
 | 
	
		
			
				|  |  |              dataset.stream().map(RerouteProcessor.DataStreamValueSource::dataset).toList(),
 | 
	
		
			
				|  |  |              namespace.stream().map(RerouteProcessor.DataStreamValueSource::namespace).toList(),
 | 
	
		
			
				|  |  |              null
 | 
	
	
		
			
				|  | @@ -269,7 +296,7 @@ public class RerouteProcessorTests extends ESTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private RerouteProcessor createRerouteProcessor(String destination) {
 | 
	
		
			
				|  |  | -        return new RerouteProcessor(null, null, List.of(), List.of(), destination);
 | 
	
		
			
				|  |  | +        return new RerouteProcessor(null, null, List.of(), List.of(), List.of(), destination);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) {
 |