Browse Source

Auto create data streams using index templates v2 (#55377)

This commit adds the ability to auto create data streams using index templates v2.
Index templates (v2) now have a data_steam field that includes a timestamp field,
if provided and index name matches with that template then a data stream
(plus first backing index) is auto created. 

Relates to #53100
Martijn van Groningen 5 years ago
parent
commit
74e2c01138
25 changed files with 788 additions and 241 deletions
  1. 5 3
      client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java
  2. 5 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetIndexTemplatesV2ResponseTests.java
  3. 51 0
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/30_auto_create_data_stream.yml
  4. 100 0
      server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java
  5. 79 3
      server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
  6. 4 10
      server/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java
  7. 34 76
      server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java
  8. 1 1
      server/src/main/java/org/elasticsearch/action/support/AutoCreateIndex.java
  9. 5 4
      server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java
  10. 86 3
      server/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateV2.java
  11. 148 0
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java
  12. 1 1
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java
  13. 5 0
      server/src/main/java/org/elasticsearch/node/Node.java
  14. 67 0
      server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java
  15. 0 96
      server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java
  16. 4 4
      server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java
  17. 1 1
      server/src/test/java/org/elasticsearch/action/admin/indices/template/post/SimulateIndexTemplateRequestTests.java
  18. 2 2
      server/src/test/java/org/elasticsearch/action/admin/indices/template/put/PutIndexTemplateV2RequestTests.java
  19. 1 1
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
  20. 1 0
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
  21. 32 8
      server/src/test/java/org/elasticsearch/cluster/metadata/IndexTemplateV2Tests.java
  22. 128 0
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java
  23. 2 2
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java
  24. 24 24
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java
  25. 2 1
      server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java

+ 5 - 3
client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java

@@ -1578,7 +1578,8 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase {
         AliasMetadata alias = AliasMetadata.builder("alias").writeIndex(true).build();
         AliasMetadata alias = AliasMetadata.builder("alias").writeIndex(true).build();
         Template template = new Template(settings, mappings, Map.of("alias", alias));
         Template template = new Template(settings, mappings, Map.of("alias", alias));
         List<String> pattern = List.of("pattern");
         List<String> pattern = List.of("pattern");
-        IndexTemplateV2 indexTemplate = new IndexTemplateV2(pattern, template, Collections.emptyList(), 1L, 1L, new HashMap<>());
+        IndexTemplateV2 indexTemplate =
+            new IndexTemplateV2(pattern, template, Collections.emptyList(), 1L, 1L, new HashMap<>(), null);
         PutIndexTemplateV2Request putIndexTemplateV2Request =
         PutIndexTemplateV2Request putIndexTemplateV2Request =
             new PutIndexTemplateV2Request().name(templateName).create(true).indexTemplate(indexTemplate);
             new PutIndexTemplateV2Request().name(templateName).create(true).indexTemplate(indexTemplate);
 
 
@@ -1624,7 +1625,8 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase {
         AliasMetadata alias = AliasMetadata.builder("alias").writeIndex(true).build();
         AliasMetadata alias = AliasMetadata.builder("alias").writeIndex(true).build();
         Template template = new Template(settings, mappings, Map.of("alias", alias));
         Template template = new Template(settings, mappings, Map.of("alias", alias));
         List<String> pattern = List.of("pattern");
         List<String> pattern = List.of("pattern");
-        IndexTemplateV2 indexTemplate = new IndexTemplateV2(pattern, template, Collections.emptyList(), 1L, 1L, new HashMap<>());
+        IndexTemplateV2 indexTemplate =
+            new IndexTemplateV2(pattern, template, Collections.emptyList(), 1L, 1L, new HashMap<>(), null);
         PutIndexTemplateV2Request putIndexTemplateV2Request =
         PutIndexTemplateV2Request putIndexTemplateV2Request =
             new PutIndexTemplateV2Request().name(templateName).create(true).indexTemplate(indexTemplate);
             new PutIndexTemplateV2Request().name(templateName).create(true).indexTemplate(indexTemplate);
 
 
@@ -1635,7 +1637,7 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase {
         SimulateIndexTemplateRequest simulateIndexTemplateRequest = new SimulateIndexTemplateRequest("pattern");
         SimulateIndexTemplateRequest simulateIndexTemplateRequest = new SimulateIndexTemplateRequest("pattern");
         AliasMetadata simulationAlias = AliasMetadata.builder("simulation-alias").writeIndex(true).build();
         AliasMetadata simulationAlias = AliasMetadata.builder("simulation-alias").writeIndex(true).build();
         IndexTemplateV2 simulationTemplate = new IndexTemplateV2(pattern, new Template(null, null,
         IndexTemplateV2 simulationTemplate = new IndexTemplateV2(pattern, new Template(null, null,
-            Map.of("simulation-alias", simulationAlias)), Collections.emptyList(), 2L, 1L, new HashMap<>());
+            Map.of("simulation-alias", simulationAlias)), Collections.emptyList(), 2L, 1L, new HashMap<>(), null);
         PutIndexTemplateV2Request newIndexTemplateReq =
         PutIndexTemplateV2Request newIndexTemplateReq =
             new PutIndexTemplateV2Request().name("used-for-simulation").create(true).indexTemplate(indexTemplate);
             new PutIndexTemplateV2Request().name("used-for-simulation").create(true).indexTemplate(indexTemplate);
         newIndexTemplateReq.indexTemplate(simulationTemplate);
         newIndexTemplateReq.indexTemplate(simulationTemplate);

+ 5 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetIndexTemplatesV2ResponseTests.java

@@ -75,6 +75,7 @@ public class GetIndexTemplatesV2ResponseTests extends ESTestCase {
         List<String> patterns = Arrays.asList(generateRandomStringArray(10, 10, false, false));
         List<String> patterns = Arrays.asList(generateRandomStringArray(10, 10, false, false));
         List<String> composedOf = null;
         List<String> composedOf = null;
         Map<String, Object> meta = null;
         Map<String, Object> meta = null;
+        IndexTemplateV2.DataStreamTemplate dataStreamTemplate = null;
         if (randomBoolean()) {
         if (randomBoolean()) {
             composedOf = Arrays.asList(generateRandomStringArray(10, 10, false, false));
             composedOf = Arrays.asList(generateRandomStringArray(10, 10, false, false));
         }
         }
@@ -84,6 +85,9 @@ public class GetIndexTemplatesV2ResponseTests extends ESTestCase {
 
 
         Long priority = randomBoolean() ? null : randomNonNegativeLong();
         Long priority = randomBoolean() ? null : randomNonNegativeLong();
         Long version = randomBoolean() ? null : randomNonNegativeLong();
         Long version = randomBoolean() ? null : randomNonNegativeLong();
-        return new IndexTemplateV2(patterns, randomTemplate(), composedOf, priority, version, meta);
+        if (randomBoolean()) {
+            dataStreamTemplate = new IndexTemplateV2.DataStreamTemplate(randomAlphaOfLength(8));
+        }
+        return new IndexTemplateV2(patterns, randomTemplate(), composedOf, priority, version, meta, dataStreamTemplate);
     }
     }
 }
 }

+ 51 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/30_auto_create_data_stream.yml

@@ -0,0 +1,51 @@
+---
+"Put index template":
+  - skip:
+      version: " - 7.9.99"
+      reason: "not yet backported"
+      features: allowed_warnings
+
+  - do:
+      allowed_warnings:
+        - "index template [test] has index patterns [test-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [test] will take precedence during new index creation"
+      indices.put_index_template:
+        name: generic_logs_template
+        body:
+          index_patterns: logs-*
+          data_stream:
+            timestamp_field: timestamp
+          template:
+            settings:
+              number_of_shards:   1
+              number_of_replicas: 0
+            mappings:
+              properties:
+                timestamp:
+                  type: date
+
+  - do:
+      index:
+        index:  logs-foobar
+        refresh: true
+        body:   { foo: bar }
+
+  - do:
+      search:
+        index: logs-foobar
+        body: { query: { match_all: {} } }
+  - length:   { hits.hits: 1  }
+  - match: { hits.hits.0._index: logs-foobar-000001 }
+  - match: { hits.hits.0._source.foo: 'bar' }
+
+  - do:
+      indices.get_data_streams:
+        name: logs-foobar
+  - match: { 0.name: logs-foobar }
+  - match: { 0.timestamp_field: 'timestamp' }
+  - length: { 0.indices: 1 }
+  - match: { 0.indices.0.index_name: 'logs-foobar-000001' }
+
+  - do:
+      indices.delete_data_stream:
+        name: logs-foobar
+  - is_true: acknowledged

+ 100 - 0
server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java

@@ -24,14 +24,25 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.admin.indices.alias.Alias;
+import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateV2Action;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateV2Action;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.ingest.PutPipelineRequest;
 import org.elasticsearch.action.ingest.PutPipelineRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.action.support.replication.ReplicationRequest;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.IndexTemplateV2;
+import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.ingest.IngestTestPlugin;
 import org.elasticsearch.ingest.IngestTestPlugin;
@@ -44,20 +55,26 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
+import static org.elasticsearch.action.DocWriteRequest.OpType.CREATE;
 import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
 import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
 import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
 import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
 import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasItemInArray;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.oneOf;
 import static org.hamcrest.Matchers.oneOf;
 
 
@@ -200,4 +217,87 @@ public class BulkIntegrationIT extends ESIntegTestCase {
             assertFalse(thread.isAlive());
             assertFalse(thread.isAlive());
         }
         }
     }
     }
+
+    public void testMixedAutoCreate() {
+        Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
+
+        PutIndexTemplateV2Action.Request createTemplateRequest = new PutIndexTemplateV2Action.Request("logs-foo");
+        createTemplateRequest.indexTemplate(
+            new IndexTemplateV2(
+                List.of("logs-foo*"),
+                new Template(settings, null, null),
+                null, null, null, null,
+                new IndexTemplateV2.DataStreamTemplate("@timestamp"))
+        );
+        client().execute(PutIndexTemplateV2Action.INSTANCE, createTemplateRequest).actionGet();
+
+        BulkRequest bulkRequest = new BulkRequest();
+        bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-foobaz").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barfoo").opType(CREATE).source("{}", XContentType.JSON));
+        BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
+        assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));
+
+        bulkRequest = new BulkRequest();
+        bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{}", XContentType.JSON));
+        bulkResponse = client().bulk(bulkRequest).actionGet();
+        assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));
+
+        bulkRequest = new BulkRequest();
+        bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-foobaz3").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barfoo3").opType(CREATE).source("{}", XContentType.JSON));
+        bulkResponse = client().bulk(bulkRequest).actionGet();
+        assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));
+
+        GetDataStreamsAction.Request getDataStreamRequest = new GetDataStreamsAction.Request("*");
+        GetDataStreamsAction.Response getDataStreamsResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
+        assertThat(getDataStreamsResponse.getDataStreams(), hasSize(4));
+        getDataStreamsResponse.getDataStreams().sort(Comparator.comparing(DataStream::getName));
+        assertThat(getDataStreamsResponse.getDataStreams().get(0).getName(), equalTo("logs-foobar"));
+        assertThat(getDataStreamsResponse.getDataStreams().get(1).getName(), equalTo("logs-foobaz"));
+        assertThat(getDataStreamsResponse.getDataStreams().get(2).getName(), equalTo("logs-foobaz2"));
+        assertThat(getDataStreamsResponse.getDataStreams().get(3).getName(), equalTo("logs-foobaz3"));
+
+        GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices("logs-bar*")).actionGet();
+        assertThat(getIndexResponse.getIndices(), arrayWithSize(4));
+        assertThat(getIndexResponse.getIndices(), hasItemInArray("logs-barbaz"));
+        assertThat(getIndexResponse.getIndices(), hasItemInArray("logs-barfoo"));
+        assertThat(getIndexResponse.getIndices(), hasItemInArray("logs-barfoo2"));
+        assertThat(getIndexResponse.getIndices(), hasItemInArray("logs-barfoo3"));
+
+        DeleteIndexTemplateV2Action.Request deleteTemplateRequest = new DeleteIndexTemplateV2Action.Request("*");
+        client().execute(DeleteIndexTemplateV2Action.INSTANCE, deleteTemplateRequest).actionGet();
+    }
+
+    public void testAutoCreateV1TemplateNoDataStream() {
+        Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
+
+        PutIndexTemplateRequest v1Request = new PutIndexTemplateRequest("logs-foo");
+        v1Request.patterns(List.of("logs-foo*"));
+        v1Request.settings(settings);
+        v1Request.order(Integer.MAX_VALUE); // in order to avoid number_of_replicas being overwritten by random_template
+        client().admin().indices().putTemplate(v1Request).actionGet();
+
+        BulkRequest bulkRequest = new BulkRequest();
+        bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
+        BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
+        assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));
+
+        GetDataStreamsAction.Request getDataStreamRequest = new GetDataStreamsAction.Request("*");
+        GetDataStreamsAction.Response getDataStreamsResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
+        assertThat(getDataStreamsResponse.getDataStreams(), hasSize(0));
+
+        GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices("logs-foobar")).actionGet();
+        assertThat(getIndexResponse.getIndices(), arrayWithSize(1));
+        assertThat(getIndexResponse.getIndices(), hasItemInArray("logs-foobar"));
+        assertThat(getIndexResponse.getSettings().get("logs-foobar").get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS), equalTo("0"));
+    }
 }
 }

+ 79 - 3
server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java

@@ -21,13 +21,24 @@ package org.elasticsearch.action.admin.indices.create;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.ActiveShardCount;
+import org.elasticsearch.action.support.ActiveShardsObserver;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.IndexTemplateV2;
+import org.elasticsearch.cluster.metadata.IndexTemplateV2.DataStreamTemplate;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService;
+import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest;
 import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
 import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
+import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.Task;
@@ -35,6 +46,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 
 
 /**
 /**
  * Api that auto creates an index that originate from requests that write into an index that doesn't yet exist.
  * Api that auto creates an index that originate from requests that write into an index that doesn't yet exist.
@@ -50,14 +62,19 @@ public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
 
 
     public static final class TransportAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {
     public static final class TransportAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {
 
 
+        private final ActiveShardsObserver activeShardsObserver;
         private final MetadataCreateIndexService createIndexService;
         private final MetadataCreateIndexService createIndexService;
+        private final MetadataCreateDataStreamService metadataCreateDataStreamService;
 
 
         @Inject
         @Inject
         public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
         public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
                                ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
                                ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
-                               MetadataCreateIndexService createIndexService) {
+                               MetadataCreateIndexService createIndexService,
+                               MetadataCreateDataStreamService metadataCreateDataStreamService) {
             super(NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new, indexNameExpressionResolver);
             super(NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new, indexNameExpressionResolver);
+            this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
             this.createIndexService = createIndexService;
             this.createIndexService = createIndexService;
+            this.metadataCreateDataStreamService = metadataCreateDataStreamService;
         }
         }
 
 
         @Override
         @Override
@@ -74,8 +91,55 @@ public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
         protected void masterOperation(Task task,
         protected void masterOperation(Task task,
                                        CreateIndexRequest request,
                                        CreateIndexRequest request,
                                        ClusterState state,
                                        ClusterState state,
-                                       ActionListener<CreateIndexResponse> listener) {
-            TransportCreateIndexAction.innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService);
+                                       ActionListener<CreateIndexResponse> finalListener) {
+            AtomicReference<String> indexNameRef = new AtomicReference<>();
+            ActionListener<ClusterStateUpdateResponse> listener = ActionListener.wrap(
+                response -> {
+                    String indexName = indexNameRef.get();
+                    assert indexName != null;
+                    if (response.isAcknowledged()) {
+                        activeShardsObserver.waitForActiveShards(
+                            new String[]{indexName},
+                            ActiveShardCount.DEFAULT,
+                            request.timeout(),
+                            shardsAcked -> {
+                                finalListener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName));
+                            },
+                            finalListener::onFailure
+                        );
+                    } else {
+                        finalListener.onResponse(new CreateIndexResponse(false, false, indexName));
+                    }
+                },
+                finalListener::onFailure
+            );
+            clusterService.submitStateUpdateTask("auto create [" + request.index() + "]",
+                new AckedClusterStateUpdateTask<>(Priority.URGENT, request, listener) {
+
+                @Override
+                protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
+                    return new ClusterStateUpdateResponse(acknowledged);
+                }
+
+                @Override
+                public ClusterState execute(ClusterState currentState) throws Exception {
+                    DataStreamTemplate dataStreamTemplate = resolveAutoCreateDataStream(request, currentState.metadata());
+                    if (dataStreamTemplate != null) {
+                        CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest(
+                            request.index(), dataStreamTemplate.getTimestampField(), request.masterNodeTimeout(), request.timeout());
+                        ClusterState clusterState =  metadataCreateDataStreamService.createDataStream(createRequest, currentState);
+                        indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName());
+                        return clusterState;
+                    } else {
+                        String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
+                        indexNameRef.set(indexName);
+                        CreateIndexClusterStateUpdateRequest updateRequest =
+                            new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
+                                .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());
+                        return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
+                    }
+                }
+            });
         }
         }
 
 
         @Override
         @Override
@@ -84,4 +148,16 @@ public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
         }
         }
     }
     }
 
 
+    static DataStreamTemplate resolveAutoCreateDataStream(CreateIndexRequest request, Metadata metadata) {
+        String v2Template = MetadataIndexTemplateService.findV2Template(metadata, request.index(), false);
+        if (v2Template != null) {
+            IndexTemplateV2 indexTemplateV2 = metadata.templatesV2().get(v2Template);
+            if (indexTemplateV2.getDataStreamTemplate() != null) {
+                return indexTemplateV2.getDataStreamTemplate();
+            }
+        }
+
+        return null;
+    }
+
 }
 }

+ 4 - 10
server/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java

@@ -71,20 +71,14 @@ public class TransportCreateIndexAction extends TransportMasterNodeAction<Create
     @Override
     @Override
     protected void masterOperation(Task task, final CreateIndexRequest request, final ClusterState state,
     protected void masterOperation(Task task, final CreateIndexRequest request, final ClusterState state,
                                    final ActionListener<CreateIndexResponse> listener) {
                                    final ActionListener<CreateIndexResponse> listener) {
-        if (request.cause().length() == 0) {
-            request.cause("api");
+        String cause = request.cause();
+        if (cause.length() == 0) {
+            cause = "api";
         }
         }
 
 
-        innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService);
-    }
-
-    static void innerCreateIndex(CreateIndexRequest request,
-                                 ActionListener<CreateIndexResponse> listener,
-                                 IndexNameExpressionResolver indexNameExpressionResolver,
-                                 MetadataCreateIndexService createIndexService) {
         final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
         final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
         final CreateIndexClusterStateUpdateRequest updateRequest =
         final CreateIndexClusterStateUpdateRequest updateRequest =
-            new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
+            new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index())
                 .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
                 .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
                 .settings(request.settings()).mappings(request.mappings())
                 .settings(request.settings()).mappings(request.mappings())
                 .aliases(request.aliases())
                 .aliases(request.aliases())

+ 34 - 76
server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java

@@ -18,47 +18,36 @@
  */
  */
 package org.elasticsearch.action.admin.indices.datastream;
 package org.elasticsearch.action.admin.indices.datastream;
 
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.ValidateActions;
 import org.elasticsearch.action.ValidateActions;
-import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
-import org.elasticsearch.cluster.metadata.DataStream;
-import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.metadata.Metadata;
-import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
+import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService;
+import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.util.List;
-import java.util.Locale;
 import java.util.Objects;
 import java.util.Objects;
 
 
 public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
 public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
 
 
-    private static final Logger logger = LogManager.getLogger(CreateDataStreamAction.class);
-
     public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction();
     public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction();
     public static final String NAME = "indices:admin/data_stream/create";
     public static final String NAME = "indices:admin/data_stream/create";
 
 
@@ -66,7 +55,7 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
         super(NAME, AcknowledgedResponse::new);
         super(NAME, AcknowledgedResponse::new);
     }
     }
 
 
-    public static class Request extends MasterNodeRequest<Request> {
+    public static class Request extends AcknowledgedRequest<Request> {
 
 
         private final String name;
         private final String name;
         private String timestampFieldName;
         private String timestampFieldName;
@@ -92,14 +81,30 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
         }
         }
 
 
         public Request(StreamInput in) throws IOException {
         public Request(StreamInput in) throws IOException {
-            super(in);
+            // TODO: replace if/else clauses with super(in); after backporting:
+            if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+                setParentTask(TaskId.readFromStream(in));
+                masterNodeTimeout(in.readTimeValue());
+                timeout(in.readTimeValue());
+            } else {
+                setParentTask(TaskId.readFromStream(in));
+                masterNodeTimeout(in.readTimeValue());
+            }
             this.name = in.readString();
             this.name = in.readString();
             this.timestampFieldName = in.readString();
             this.timestampFieldName = in.readString();
         }
         }
 
 
         @Override
         @Override
         public void writeTo(StreamOutput out) throws IOException {
         public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
+            // TODO: replace if/else clauses with super.writeTo(out); after backporting:
+            if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+                getParentTask().writeTo(out);
+                out.writeTimeValue(masterNodeTimeout());
+                out.writeTimeValue(timeout());
+            } else {
+                getParentTask().writeTo(out);
+                out.writeTimeValue(masterNodeTimeout());
+            }
             out.writeString(name);
             out.writeString(name);
             out.writeString(timestampFieldName);
             out.writeString(timestampFieldName);
         }
         }
@@ -121,14 +126,14 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
 
 
     public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
     public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
 
 
-        private final MetadataCreateIndexService metadataCreateIndexService;
+        private final MetadataCreateDataStreamService metadataCreateDataStreamService;
 
 
         @Inject
         @Inject
         public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
         public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
                                ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
                                ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
-                               MetadataCreateIndexService metadataCreateIndexService) {
+                               MetadataCreateDataStreamService metadataCreateDataStreamService) {
             super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
             super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
-            this.metadataCreateIndexService = metadataCreateIndexService;
+            this.metadataCreateDataStreamService = metadataCreateDataStreamService;
         }
         }
 
 
         @Override
         @Override
@@ -144,60 +149,13 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
         @Override
         @Override
         protected void masterOperation(Task task, Request request, ClusterState state,
         protected void masterOperation(Task task, Request request, ClusterState state,
                                        ActionListener<AcknowledgedResponse> listener) throws Exception {
                                        ActionListener<AcknowledgedResponse> listener) throws Exception {
-            clusterService.submitStateUpdateTask("create-data-stream [" + request.name + "]",
-                new ClusterStateUpdateTask(Priority.HIGH) {
-
-                    @Override
-                    public TimeValue timeout() {
-                        return request.masterNodeTimeout();
-                    }
-
-                    @Override
-                    public void onFailure(String source, Exception e) {
-                        listener.onFailure(e);
-                    }
-
-                    @Override
-                    public ClusterState execute(ClusterState currentState) throws Exception {
-                        return createDataStream(metadataCreateIndexService, currentState, request);
-                    }
-
-                    @Override
-                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                        listener.onResponse(new AcknowledgedResponse(true));
-                    }
-                });
-        }
-
-        static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
-                                             ClusterState currentState,
-                                             Request request) throws Exception {
-            if (currentState.metadata().dataStreams().containsKey(request.name)) {
-                throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
-            }
-
-            MetadataCreateIndexService.validateIndexOrAliasName(request.name,
-                (s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));
-
-            if (request.name.toLowerCase(Locale.ROOT).equals(request.name) == false) {
-                throw new IllegalArgumentException("data_stream [" + request.name + "] must be lowercase");
-            }
-            if (request.name.startsWith(".")) {
-                throw new IllegalArgumentException("data_stream [" + request.name + "] must not start with '.'");
-            }
-
-            String firstBackingIndexName = DataStream.getBackingIndexName(request.name, 1);
-            CreateIndexClusterStateUpdateRequest createIndexRequest =
-                new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
-                .settings(Settings.builder().put("index.hidden", true).build());
-            currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
-            IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName);
-            assert firstBackingIndex != null;
-
-            Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(
-                new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex())));
-            logger.info("adding data stream [{}]", request.name);
-            return ClusterState.builder(currentState).metadata(builder).build();
+            CreateDataStreamClusterStateUpdateRequest updateRequest =  new CreateDataStreamClusterStateUpdateRequest(
+                request.name,
+                request.timestampFieldName,
+                request.masterNodeTimeout(),
+                request.timeout()
+            );
+            metadataCreateDataStreamService.createDataStream(updateRequest, listener);
         }
         }
 
 
         @Override
         @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/action/support/AutoCreateIndex.java

@@ -64,7 +64,7 @@ public final class AutoCreateIndex {
      * @throws IndexNotFoundException if the index doesn't exist and shouldn't be auto created
      * @throws IndexNotFoundException if the index doesn't exist and shouldn't be auto created
      */
      */
     public boolean shouldAutoCreate(String index, ClusterState state) {
     public boolean shouldAutoCreate(String index, ClusterState state) {
-        if (resolver.hasIndexOrAlias(index, state)) {
+        if (resolver.hasIndexAbstraction(index, state)) {
             return false;
             return false;
         }
         }
         // One volatile read, so that all checks are done against the same instance:
         // One volatile read, so that all checks are done against the same instance:

+ 5 - 4
server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java

@@ -360,11 +360,12 @@ public class IndexNameExpressionResolver {
     }
     }
 
 
     /**
     /**
-     * @return whether the specified alias or index exists. If the alias or index contains datemath then that is resolved too.
+     * @return whether the specified index, data stream or alias exists.
+     *         If the data stream, index or alias contains date math then that is resolved too.
      */
      */
-    public boolean hasIndexOrAlias(String aliasOrIndex, ClusterState state) {
-        Context context = new Context(state, IndicesOptions.lenientExpandOpen());
-        String resolvedAliasOrIndex = dateMathExpressionResolver.resolveExpression(aliasOrIndex, context);
+    public boolean hasIndexAbstraction(String indexAbstraction, ClusterState state) {
+        Context context = new Context(state, IndicesOptions.lenientExpandOpen(), false, false, true);
+        String resolvedAliasOrIndex = dateMathExpressionResolver.resolveExpression(indexAbstraction, context);
         return state.metadata().getIndicesLookup().containsKey(resolvedAliasOrIndex);
         return state.metadata().getIndicesLookup().containsKey(resolvedAliasOrIndex);
     }
     }
 
 

+ 86 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateV2.java

@@ -19,6 +19,7 @@
 
 
 package org.elasticsearch.cluster.metadata;
 package org.elasticsearch.cluster.metadata;
 
 
+import org.elasticsearch.Version;
 import org.elasticsearch.cluster.AbstractDiffable;
 import org.elasticsearch.cluster.AbstractDiffable;
 import org.elasticsearch.cluster.Diff;
 import org.elasticsearch.cluster.Diff;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Nullable;
@@ -26,6 +27,7 @@ import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -48,6 +50,7 @@ public class IndexTemplateV2 extends AbstractDiffable<IndexTemplateV2> implement
     private static final ParseField COMPOSED_OF = new ParseField("composed_of");
     private static final ParseField COMPOSED_OF = new ParseField("composed_of");
     private static final ParseField VERSION = new ParseField("version");
     private static final ParseField VERSION = new ParseField("version");
     private static final ParseField METADATA = new ParseField("_meta");
     private static final ParseField METADATA = new ParseField("_meta");
+    private static final ParseField DATA_STREAM = new ParseField("data_stream");
 
 
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     public static final ConstructingObjectParser<IndexTemplateV2, Void> PARSER = new ConstructingObjectParser<>("index_template", false,
     public static final ConstructingObjectParser<IndexTemplateV2, Void> PARSER = new ConstructingObjectParser<>("index_template", false,
@@ -56,7 +59,8 @@ public class IndexTemplateV2 extends AbstractDiffable<IndexTemplateV2> implement
             (List<String>) a[2],
             (List<String>) a[2],
             (Long) a[3],
             (Long) a[3],
             (Long) a[4],
             (Long) a[4],
-            (Map<String, Object>) a[5]));
+            (Map<String, Object>) a[5],
+            (DataStreamTemplate) a[6]));
 
 
     static {
     static {
         PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX_PATTERNS);
         PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX_PATTERNS);
@@ -65,6 +69,7 @@ public class IndexTemplateV2 extends AbstractDiffable<IndexTemplateV2> implement
         PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), PRIORITY);
         PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), PRIORITY);
         PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION);
         PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION);
         PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA);
         PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA);
+        PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), DataStreamTemplate.PARSER, DATA_STREAM);
     }
     }
 
 
     private final List<String> indexPatterns;
     private final List<String> indexPatterns;
@@ -78,6 +83,8 @@ public class IndexTemplateV2 extends AbstractDiffable<IndexTemplateV2> implement
     private final Long version;
     private final Long version;
     @Nullable
     @Nullable
     private final Map<String, Object> metadata;
     private final Map<String, Object> metadata;
+    @Nullable
+    private final DataStreamTemplate dataStreamTemplate;
 
 
     static Diff<IndexTemplateV2> readITV2DiffFrom(StreamInput in) throws IOException {
     static Diff<IndexTemplateV2> readITV2DiffFrom(StreamInput in) throws IOException {
         return AbstractDiffable.readDiffFrom(IndexTemplateV2::new, in);
         return AbstractDiffable.readDiffFrom(IndexTemplateV2::new, in);
@@ -89,12 +96,19 @@ public class IndexTemplateV2 extends AbstractDiffable<IndexTemplateV2> implement
 
 
     public IndexTemplateV2(List<String> indexPatterns, @Nullable Template template, @Nullable List<String> componentTemplates,
     public IndexTemplateV2(List<String> indexPatterns, @Nullable Template template, @Nullable List<String> componentTemplates,
                            @Nullable Long priority, @Nullable Long version, @Nullable Map<String, Object> metadata) {
                            @Nullable Long priority, @Nullable Long version, @Nullable Map<String, Object> metadata) {
+        this(indexPatterns, template, componentTemplates, priority, version, metadata, null);
+    }
+
+    public IndexTemplateV2(List<String> indexPatterns, @Nullable Template template, @Nullable List<String> componentTemplates,
+                           @Nullable Long priority, @Nullable Long version, @Nullable Map<String, Object> metadata,
+                           @Nullable DataStreamTemplate dataStreamTemplate) {
         this.indexPatterns = indexPatterns;
         this.indexPatterns = indexPatterns;
         this.template = template;
         this.template = template;
         this.componentTemplates = componentTemplates;
         this.componentTemplates = componentTemplates;
         this.priority = priority;
         this.priority = priority;
         this.version = version;
         this.version = version;
         this.metadata = metadata;
         this.metadata = metadata;
+        this.dataStreamTemplate = dataStreamTemplate;
     }
     }
 
 
     public IndexTemplateV2(StreamInput in) throws IOException {
     public IndexTemplateV2(StreamInput in) throws IOException {
@@ -108,6 +122,11 @@ public class IndexTemplateV2 extends AbstractDiffable<IndexTemplateV2> implement
         this.priority = in.readOptionalVLong();
         this.priority = in.readOptionalVLong();
         this.version = in.readOptionalVLong();
         this.version = in.readOptionalVLong();
         this.metadata = in.readMap();
         this.metadata = in.readMap();
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            this.dataStreamTemplate = in.readOptionalWriteable(DataStreamTemplate::new);
+        } else {
+            this.dataStreamTemplate = null;
+        }
     }
     }
 
 
     public List<String> indexPatterns() {
     public List<String> indexPatterns() {
@@ -145,6 +164,10 @@ public class IndexTemplateV2 extends AbstractDiffable<IndexTemplateV2> implement
         return metadata;
         return metadata;
     }
     }
 
 
+    public DataStreamTemplate getDataStreamTemplate() {
+        return dataStreamTemplate;
+    }
+
     @Override
     @Override
     public void writeTo(StreamOutput out) throws IOException {
     public void writeTo(StreamOutput out) throws IOException {
         out.writeStringCollection(this.indexPatterns);
         out.writeStringCollection(this.indexPatterns);
@@ -158,6 +181,9 @@ public class IndexTemplateV2 extends AbstractDiffable<IndexTemplateV2> implement
         out.writeOptionalVLong(this.priority);
         out.writeOptionalVLong(this.priority);
         out.writeOptionalVLong(this.version);
         out.writeOptionalVLong(this.version);
         out.writeMap(this.metadata);
         out.writeMap(this.metadata);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeOptionalWriteable(dataStreamTemplate);
+        }
     }
     }
 
 
     @Override
     @Override
@@ -179,13 +205,17 @@ public class IndexTemplateV2 extends AbstractDiffable<IndexTemplateV2> implement
         if (this.metadata != null) {
         if (this.metadata != null) {
             builder.field(METADATA.getPreferredName(), metadata);
             builder.field(METADATA.getPreferredName(), metadata);
         }
         }
+        if (this.dataStreamTemplate != null) {
+            builder.field(DATA_STREAM.getPreferredName(), dataStreamTemplate);
+        }
         builder.endObject();
         builder.endObject();
         return builder;
         return builder;
     }
     }
 
 
     @Override
     @Override
     public int hashCode() {
     public int hashCode() {
-        return Objects.hash(this.indexPatterns, this.template, this.componentTemplates, this.priority, this.version, this.metadata);
+        return Objects.hash(this.indexPatterns, this.template, this.componentTemplates, this.priority, this.version,
+            this.metadata, this.dataStreamTemplate);
     }
     }
 
 
     @Override
     @Override
@@ -202,11 +232,64 @@ public class IndexTemplateV2 extends AbstractDiffable<IndexTemplateV2> implement
             Objects.equals(this.componentTemplates, other.componentTemplates) &&
             Objects.equals(this.componentTemplates, other.componentTemplates) &&
             Objects.equals(this.priority, other.priority) &&
             Objects.equals(this.priority, other.priority) &&
             Objects.equals(this.version, other.version) &&
             Objects.equals(this.version, other.version) &&
-            Objects.equals(this.metadata, other.metadata);
+            Objects.equals(this.metadata, other.metadata) &&
+            Objects.equals(this.dataStreamTemplate, other.dataStreamTemplate);
     }
     }
 
 
     @Override
     @Override
     public String toString() {
     public String toString() {
         return Strings.toString(this);
         return Strings.toString(this);
     }
     }
+
+    public static class DataStreamTemplate implements Writeable, ToXContentObject {
+
+        private static final ConstructingObjectParser<DataStreamTemplate, Void> PARSER = new ConstructingObjectParser<>(
+            "data_stream_template",
+            args -> new DataStreamTemplate((String) args[0])
+        );
+
+        static {
+            PARSER.declareString(ConstructingObjectParser.constructorArg(), DataStream.TIMESTAMP_FIELD_FIELD);
+        }
+
+        private final String timestampField;
+
+        public DataStreamTemplate(String timestampField) {
+            this.timestampField = timestampField;
+        }
+
+        public String getTimestampField() {
+            return timestampField;
+        }
+
+        DataStreamTemplate(StreamInput in) throws IOException {
+            this(in.readString());
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeString(timestampField);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.field(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName(), timestampField);
+            builder.endObject();
+            return builder;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            DataStreamTemplate that = (DataStreamTemplate) o;
+            return timestampField.equals(that.timestampField);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(timestampField);
+        }
+    }
 }
 }

+ 148 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

@@ -0,0 +1,148 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.metadata;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
+import org.elasticsearch.action.support.ActiveShardCount;
+import org.elasticsearch.action.support.ActiveShardsObserver;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
+import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class MetadataCreateDataStreamService {
+
+    private static final Logger logger = LogManager.getLogger(MetadataCreateDataStreamService.class);
+
+    private final ClusterService clusterService;
+    private final ActiveShardsObserver activeShardsObserver;
+    private final MetadataCreateIndexService metadataCreateIndexService;
+
+    public MetadataCreateDataStreamService(ThreadPool threadPool,
+                                           ClusterService clusterService,
+                                           MetadataCreateIndexService metadataCreateIndexService) {
+        this.clusterService = clusterService;
+        this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
+        this.metadataCreateIndexService = metadataCreateIndexService;
+    }
+
+    public void createDataStream(CreateDataStreamClusterStateUpdateRequest request,
+                                 ActionListener<AcknowledgedResponse> finalListener) {
+        AtomicReference<String> firstBackingIndexRef = new AtomicReference<>();
+        ActionListener<ClusterStateUpdateResponse> listener = ActionListener.wrap(
+            response -> {
+                if (response.isAcknowledged()) {
+                    String firstBackingIndexName = firstBackingIndexRef.get();
+                    assert firstBackingIndexName != null;
+                    activeShardsObserver.waitForActiveShards(
+                        new String[]{firstBackingIndexName},
+                        ActiveShardCount.DEFAULT,
+                        request.masterNodeTimeout(),
+                        shardsAcked -> {
+                            finalListener.onResponse(new AcknowledgedResponse(true));
+                        },
+                        finalListener::onFailure);
+                } else {
+                    finalListener.onResponse(new AcknowledgedResponse(false));
+                }
+            },
+            finalListener::onFailure
+        );
+        clusterService.submitStateUpdateTask("create-data-stream [" + request.name + "]",
+            new AckedClusterStateUpdateTask<>(Priority.HIGH, request, listener) {
+
+                @Override
+                public ClusterState execute(ClusterState currentState) throws Exception {
+                    ClusterState clusterState = createDataStream(metadataCreateIndexService, currentState, request);
+                    firstBackingIndexRef.set(clusterState.metadata().dataStreams().get(request.name).getIndices().get(0).getName());
+                    return clusterState;
+                }
+
+                @Override
+                protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
+                    return new ClusterStateUpdateResponse(acknowledged);
+                }
+            });
+    }
+
+    public ClusterState createDataStream(CreateDataStreamClusterStateUpdateRequest request, ClusterState current) throws Exception {
+        return createDataStream(metadataCreateIndexService, current, request);
+    }
+
+    public static final class CreateDataStreamClusterStateUpdateRequest extends ClusterStateUpdateRequest {
+
+        private final String name;
+        private final String timestampFieldName;
+
+        public CreateDataStreamClusterStateUpdateRequest(String name,
+                                                         String timestampFieldName,
+                                                         TimeValue masterNodeTimeout,
+                                                         TimeValue timeout) {
+            this.name = name;
+            this.timestampFieldName = timestampFieldName;
+            masterNodeTimeout(masterNodeTimeout);
+            ackTimeout(timeout);
+        }
+    }
+
+    static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
+                                         ClusterState currentState,
+                                         CreateDataStreamClusterStateUpdateRequest request) throws Exception {
+        if (currentState.metadata().dataStreams().containsKey(request.name)) {
+            throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
+        }
+
+        MetadataCreateIndexService.validateIndexOrAliasName(request.name,
+            (s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));
+
+        if (request.name.toLowerCase(Locale.ROOT).equals(request.name) == false) {
+            throw new IllegalArgumentException("data_stream [" + request.name + "] must be lowercase");
+        }
+        if (request.name.startsWith(".")) {
+            throw new IllegalArgumentException("data_stream [" + request.name + "] must not start with '.'");
+        }
+
+        String firstBackingIndexName = DataStream.getBackingIndexName(request.name, 1);
+        CreateIndexClusterStateUpdateRequest createIndexRequest =
+            new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
+                .settings(Settings.builder().put("index.hidden", true).build());
+        currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
+        IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName);
+        assert firstBackingIndex != null;
+
+        Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(
+            new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex())));
+        logger.info("adding data stream [{}]", request.name);
+        return ClusterState.builder(currentState).metadata(builder).build();
+    }
+
+}

+ 1 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

@@ -400,7 +400,7 @@ public class MetadataIndexTemplateService {
             final Template finalTemplate = new Template(finalSettings,
             final Template finalTemplate = new Template(finalSettings,
                 stringMappings == null ? null : new CompressedXContent(stringMappings), innerTemplate.aliases());
                 stringMappings == null ? null : new CompressedXContent(stringMappings), innerTemplate.aliases());
             finalIndexTemplate = new IndexTemplateV2(template.indexPatterns(), finalTemplate, template.composedOf(),
             finalIndexTemplate = new IndexTemplateV2(template.indexPatterns(), finalTemplate, template.composedOf(),
-                template.priority(), template.version(), template.metadata());
+                template.priority(), template.version(), template.metadata(), template.getDataStreamTemplate());
         }
         }
 
 
         validate(name, finalIndexTemplate);
         validate(name, finalIndexTemplate);

+ 5 - 0
server/src/main/java/org/elasticsearch/node/Node.java

@@ -50,6 +50,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
 import org.elasticsearch.cluster.metadata.AliasValidator;
 import org.elasticsearch.cluster.metadata.AliasValidator;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService;
 import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
 import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
 import org.elasticsearch.cluster.metadata.MetadataIndexUpgradeService;
 import org.elasticsearch.cluster.metadata.MetadataIndexUpgradeService;
 import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
 import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
@@ -466,6 +467,9 @@ public class Node implements Closeable {
                     systemIndexDescriptors,
                     systemIndexDescriptors,
                     forbidPrivateIndexSettings);
                     forbidPrivateIndexSettings);
 
 
+            final MetadataCreateDataStreamService metadataCreateDataStreamService =
+                new MetadataCreateDataStreamService(threadPool, clusterService, metadataCreateIndexService);
+
             final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
             final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
             Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
             Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
                 .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
                 .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
@@ -575,6 +579,7 @@ public class Node implements Closeable {
                     b.bind(IndicesService.class).toInstance(indicesService);
                     b.bind(IndicesService.class).toInstance(indicesService);
                     b.bind(AliasValidator.class).toInstance(aliasValidator);
                     b.bind(AliasValidator.class).toInstance(aliasValidator);
                     b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService);
                     b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService);
+                    b.bind(MetadataCreateDataStreamService.class).toInstance(metadataCreateDataStreamService);
                     b.bind(SearchService.class).toInstance(searchService);
                     b.bind(SearchService.class).toInstance(searchService);
                     b.bind(SearchTransportService.class).toInstance(searchTransportService);
                     b.bind(SearchTransportService.class).toInstance(searchTransportService);
                     b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(
                     b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(

+ 67 - 0
server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java

@@ -0,0 +1,67 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.indices.create;
+
+import org.elasticsearch.cluster.metadata.IndexTemplateV2;
+import org.elasticsearch.cluster.metadata.IndexTemplateV2.DataStreamTemplate;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class AutoCreateActionTests extends ESTestCase {
+
+    public void testResolveAutoCreateDataStreams() {
+        Metadata metadata;
+        {
+            Metadata.Builder mdBuilder = new Metadata.Builder();
+            DataStreamTemplate dataStreamTemplate = new DataStreamTemplate("@timestamp");
+            mdBuilder.put("1", new IndexTemplateV2(List.of("legacy-logs-*"), null, null, 10L, null, null, null));
+            mdBuilder.put("2", new IndexTemplateV2(List.of("logs-*"), null, null, 20L, null, null, dataStreamTemplate));
+            mdBuilder.put("3", new IndexTemplateV2(List.of("logs-foobar"), null, null, 30L, null, null, dataStreamTemplate));
+            metadata = mdBuilder.build();
+        }
+
+        CreateIndexRequest request = new CreateIndexRequest("logs-foobar");
+        DataStreamTemplate result  = AutoCreateAction.resolveAutoCreateDataStream(request, metadata);
+        assertThat(result, notNullValue());
+        assertThat(result.getTimestampField(), equalTo("@timestamp"));
+
+        request = new CreateIndexRequest("logs-barbaz");
+        result  = AutoCreateAction.resolveAutoCreateDataStream(request, metadata);
+        assertThat(result, notNullValue());
+        assertThat(result.getTimestampField(), equalTo("@timestamp"));
+
+        // An index that matches with a template without a data steam definition
+        request = new CreateIndexRequest("legacy-logs-foobaz");
+        result = AutoCreateAction.resolveAutoCreateDataStream(request, metadata);
+        assertThat(result, nullValue());
+
+        // An index that doesn't match with an index template
+        request = new CreateIndexRequest("my-index");
+        result = AutoCreateAction.resolveAutoCreateDataStream(request, metadata);
+        assertThat(result, nullValue());
+    }
+
+}

+ 0 - 96
server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java

@@ -18,31 +18,13 @@
  */
  */
 package org.elasticsearch.action.admin.indices.datastream;
 package org.elasticsearch.action.admin.indices.datastream;
 
 
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
 import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.DataStream;
-import org.elasticsearch.cluster.metadata.IndexMetadata;
-import org.elasticsearch.cluster.metadata.Metadata;
-import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 
 
-import java.util.List;
-import java.util.Map;
-
-import static org.elasticsearch.cluster.DataStreamTestHelper.createFirstBackingIndex;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 
 public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
 public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
 
 
@@ -73,82 +55,4 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCas
         assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing"));
         assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing"));
     }
     }
 
 
-    public void testCreateDataStream() throws Exception {
-        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
-        final String dataStreamName = "my-data-stream";
-        ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
-        CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
-        ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req);
-        assertThat(newState.metadata().dataStreams().size(), equalTo(1));
-        assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
-        assertThat(newState.metadata().index(DataStream.getBackingIndexName(dataStreamName, 1)), notNullValue());
-        assertThat(newState.metadata().index(DataStream.getBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"),
-            equalTo("true"));
-    }
-
-    public void testCreateDuplicateDataStream() throws Exception {
-        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
-        final String dataStreamName = "my-data-stream";
-        IndexMetadata idx = createFirstBackingIndex(dataStreamName).build();
-        DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", List.of(idx.getIndex()));
-        ClusterState cs = ClusterState.builder(new ClusterName("_name"))
-            .metadata(Metadata.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build();
-        CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
-
-        IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
-            () -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
-        assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists"));
-    }
-
-    public void testCreateDataStreamWithInvalidName() throws Exception {
-        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
-        final String dataStreamName = "_My-da#ta- ,stream-";
-        ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
-        CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
-        IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
-            () -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
-        assertThat(e.getMessage(), containsString("must not contain the following characters"));
-    }
-
-    public void testCreateDataStreamWithUppercaseCharacters() throws Exception {
-        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
-        final String dataStreamName = "MAY_NOT_USE_UPPERCASE";
-        ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
-        CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
-        IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
-            () -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
-        assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must be lowercase"));
-    }
-
-    public void testCreateDataStreamStartingWithPeriod() throws Exception {
-        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
-        final String dataStreamName = ".may_not_start_with_period";
-        ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
-        CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
-        IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
-            () -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
-        assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must not start with '.'"));
-    }
-
-    private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
-        MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
-        when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))
-            .thenAnswer(mockInvocation -> {
-                ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0];
-                CreateIndexClusterStateUpdateRequest request = (CreateIndexClusterStateUpdateRequest) mockInvocation.getArguments()[1];
-
-                Metadata.Builder b = Metadata.builder(currentState.metadata())
-                    .put(IndexMetadata.builder(request.index())
-                        .settings(Settings.builder()
-                            .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
-                            .put(request.settings())
-                            .build())
-                        .numberOfShards(1)
-                        .numberOfReplicas(1)
-                        .build(), false);
-                return ClusterState.builder(currentState).metadata(b.build()).build();
-            });
-
-        return s;
-    }
 }
 }

+ 4 - 4
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java

@@ -353,7 +353,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
         aliases.put("foo-write", AliasMetadata.builder("foo-write").build());
         aliases.put("foo-write", AliasMetadata.builder("foo-write").build());
         aliases.put("bar-write", AliasMetadata.builder("bar-write").writeIndex(randomBoolean()).build());
         aliases.put("bar-write", AliasMetadata.builder("bar-write").writeIndex(randomBoolean()).build());
         final IndexTemplateV2 template = new IndexTemplateV2(Arrays.asList("foo-*", "bar-*"), new Template(null, null, aliases),
         final IndexTemplateV2 template = new IndexTemplateV2(Arrays.asList("foo-*", "bar-*"), new Template(null, null, aliases),
-            null, null, null, null);
+            null, null, null, null, null);
 
 
         final Metadata metadata = Metadata.builder().put(createMetadata(randomAlphaOfLengthBetween(5, 7)), false)
         final Metadata metadata = Metadata.builder().put(createMetadata(randomAlphaOfLengthBetween(5, 7)), false)
             .put("test-template", template).build();
             .put("test-template", template).build();
@@ -370,7 +370,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
         aliases.put("bar-write", AliasMetadata.builder("bar-write").writeIndex(randomBoolean()).build());
         aliases.put("bar-write", AliasMetadata.builder("bar-write").writeIndex(randomBoolean()).build());
         final ComponentTemplate ct = new ComponentTemplate(new Template(null, null, aliases), null, null);
         final ComponentTemplate ct = new ComponentTemplate(new Template(null, null, aliases), null, null);
         final IndexTemplateV2 template = new IndexTemplateV2(Arrays.asList("foo-*", "bar-*"), null,
         final IndexTemplateV2 template = new IndexTemplateV2(Arrays.asList("foo-*", "bar-*"), null,
-            Collections.singletonList("ct"), null, null, null);
+            Collections.singletonList("ct"), null, null, null, null);
 
 
         final Metadata metadata = Metadata.builder().put(createMetadata(randomAlphaOfLengthBetween(5, 7)), false)
         final Metadata metadata = Metadata.builder().put(createMetadata(randomAlphaOfLengthBetween(5, 7)), false)
             .put("ct", ct)
             .put("ct", ct)
@@ -405,7 +405,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
         aliases.put("foo-write", AliasMetadata.builder("foo-write").build());
         aliases.put("foo-write", AliasMetadata.builder("foo-write").build());
         aliases.put("bar-write", AliasMetadata.builder("bar-write").writeIndex(randomBoolean()).build());
         aliases.put("bar-write", AliasMetadata.builder("bar-write").writeIndex(randomBoolean()).build());
         final IndexTemplateV2 template = new IndexTemplateV2(Collections.singletonList("*"), new Template(null, null, aliases),
         final IndexTemplateV2 template = new IndexTemplateV2(Collections.singletonList("*"), new Template(null, null, aliases),
-            null, null, null, null);
+            null, null, null, null, null);
 
 
         final Metadata metadata = Metadata.builder().put(createMetadata(randomAlphaOfLengthBetween(5, 7)), false)
         final Metadata metadata = Metadata.builder().put(createMetadata(randomAlphaOfLengthBetween(5, 7)), false)
             .put("test-template", template).build();
             .put("test-template", template).build();
@@ -426,7 +426,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
         aliases.put("bar-write", AliasMetadata.builder("bar-write").writeIndex(randomBoolean()).build());
         aliases.put("bar-write", AliasMetadata.builder("bar-write").writeIndex(randomBoolean()).build());
         final ComponentTemplate ct = new ComponentTemplate(new Template(null, null, aliases), null, null);
         final ComponentTemplate ct = new ComponentTemplate(new Template(null, null, aliases), null, null);
         final IndexTemplateV2 template = new IndexTemplateV2(Collections.singletonList("*"), null,
         final IndexTemplateV2 template = new IndexTemplateV2(Collections.singletonList("*"), null,
-            Collections.singletonList("ct"), null, null, null);
+            Collections.singletonList("ct"), null, null, null, null);
 
 
         final Metadata metadata = Metadata.builder().put(createMetadata(randomAlphaOfLengthBetween(5, 7)), false)
         final Metadata metadata = Metadata.builder().put(createMetadata(randomAlphaOfLengthBetween(5, 7)), false)
             .put("ct", ct)
             .put("ct", ct)

+ 1 - 1
server/src/test/java/org/elasticsearch/action/admin/indices/template/post/SimulateIndexTemplateRequestTests.java

@@ -63,7 +63,7 @@ public class SimulateIndexTemplateRequestTests extends AbstractWireSerializingTe
 
 
     public void testAddingGlobalTemplateWithHiddenIndexSettingIsIllegal() {
     public void testAddingGlobalTemplateWithHiddenIndexSettingIsIllegal() {
         Template template = new Template(Settings.builder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true).build(), null, null);
         Template template = new Template(Settings.builder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true).build(), null, null);
-        IndexTemplateV2 globalTemplate = new IndexTemplateV2(List.of("*"), template, null, null, null, null);
+        IndexTemplateV2 globalTemplate = new IndexTemplateV2(List.of("*"), template, null, null, null, null, null);
 
 
         PutIndexTemplateV2Action.Request request = new PutIndexTemplateV2Action.Request("test");
         PutIndexTemplateV2Action.Request request = new PutIndexTemplateV2Action.Request("test");
         request.indexTemplate(globalTemplate);
         request.indexTemplate(globalTemplate);

+ 2 - 2
server/src/test/java/org/elasticsearch/action/admin/indices/template/put/PutIndexTemplateV2RequestTests.java

@@ -57,7 +57,7 @@ public class PutIndexTemplateV2RequestTests extends AbstractWireSerializingTestC
 
 
     public void testPutGlobalTemplatesCannotHaveHiddenIndexSetting() {
     public void testPutGlobalTemplatesCannotHaveHiddenIndexSetting() {
         Template template = new Template(Settings.builder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true).build(), null, null);
         Template template = new Template(Settings.builder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true).build(), null, null);
-        IndexTemplateV2 globalTemplate = new IndexTemplateV2(List.of("*"), template, null, null, null, null);
+        IndexTemplateV2 globalTemplate = new IndexTemplateV2(List.of("*"), template, null, null, null, null, null);
 
 
         PutIndexTemplateV2Action.Request request = new PutIndexTemplateV2Action.Request("test");
         PutIndexTemplateV2Action.Request request = new PutIndexTemplateV2Action.Request("test");
         request.indexTemplate(globalTemplate);
         request.indexTemplate(globalTemplate);
@@ -83,7 +83,7 @@ public class PutIndexTemplateV2RequestTests extends AbstractWireSerializingTestC
 
 
     public void testValidationOfPriority() {
     public void testValidationOfPriority() {
         PutIndexTemplateV2Action.Request req = new PutIndexTemplateV2Action.Request("test");
         PutIndexTemplateV2Action.Request req = new PutIndexTemplateV2Action.Request("test");
-        req.indexTemplate(new IndexTemplateV2(Arrays.asList("foo", "bar"), null, null, -5L, null, null));
+        req.indexTemplate(new IndexTemplateV2(Arrays.asList("foo", "bar"), null, null, -5L, null, null, null));
         ActionRequestValidationException validationException = req.validate();
         ActionRequestValidationException validationException = req.validate();
         assertThat(validationException, is(notNullValue()));
         assertThat(validationException, is(notNullValue()));
         List<String> validationErrors = validationException.validationErrors();
         List<String> validationErrors = validationException.validationErrors();

+ 1 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

@@ -581,7 +581,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
 
 
         IndexTemplateV2 t1 = new IndexTemplateV2(Collections.singletonList("missing_*"),
         IndexTemplateV2 t1 = new IndexTemplateV2(Collections.singletonList("missing_*"),
             new Template(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline2").build(), null, null),
             new Template(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline2").build(), null, null),
-            null, null, null, null);
+            null, null, null, null, null);
 
 
         ClusterState state = clusterService.state();
         ClusterState state = clusterService.state();
         Metadata metadata = Metadata.builder()
         Metadata metadata = Metadata.builder()

+ 1 - 0
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

@@ -291,4 +291,5 @@ public class TransportBulkActionTests extends ESTestCase {
             assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline"));
             assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline"));
         }
         }
     }
     }
+
 }
 }

+ 32 - 8
server/src/test/java/org/elasticsearch/cluster/metadata/IndexTemplateV2Tests.java

@@ -87,6 +87,8 @@ public class IndexTemplateV2Tests extends AbstractDiffableSerializationTestCase<
             meta = randomMeta();
             meta = randomMeta();
         }
         }
 
 
+        IndexTemplateV2.DataStreamTemplate dataStreamTemplate = randomDataStreamTemplate();
+
         List<String> indexPatterns = randomList(1, 4, () -> randomAlphaOfLength(4));
         List<String> indexPatterns = randomList(1, 4, () -> randomAlphaOfLength(4));
         List<String> componentTemplates = randomList(0, 10, () -> randomAlphaOfLength(5));
         List<String> componentTemplates = randomList(0, 10, () -> randomAlphaOfLength(5));
         return new IndexTemplateV2(indexPatterns,
         return new IndexTemplateV2(indexPatterns,
@@ -94,7 +96,8 @@ public class IndexTemplateV2Tests extends AbstractDiffableSerializationTestCase<
             componentTemplates,
             componentTemplates,
             randomBoolean() ? null : randomNonNegativeLong(),
             randomBoolean() ? null : randomNonNegativeLong(),
             randomBoolean() ? null : randomNonNegativeLong(),
             randomBoolean() ? null : randomNonNegativeLong(),
-            meta);
+            meta,
+            dataStreamTemplate);
     }
     }
 
 
     private static Map<String, AliasMetadata> randomAliases() {
     private static Map<String, AliasMetadata> randomAliases() {
@@ -137,25 +140,34 @@ public class IndexTemplateV2Tests extends AbstractDiffableSerializationTestCase<
         }
         }
     }
     }
 
 
+    private static IndexTemplateV2.DataStreamTemplate randomDataStreamTemplate() {
+        if (randomBoolean()) {
+            return null;
+        } else {
+            return new IndexTemplateV2.DataStreamTemplate(randomAlphaOfLength(8));
+        }
+    }
+
     @Override
     @Override
     protected IndexTemplateV2 mutateInstance(IndexTemplateV2 orig) throws IOException {
     protected IndexTemplateV2 mutateInstance(IndexTemplateV2 orig) throws IOException {
         return mutateTemplate(orig);
         return mutateTemplate(orig);
     }
     }
 
 
     public static IndexTemplateV2 mutateTemplate(IndexTemplateV2 orig) {
     public static IndexTemplateV2 mutateTemplate(IndexTemplateV2 orig) {
-        switch (randomIntBetween(0, 5)) {
+        switch (randomIntBetween(0, 6)) {
             case 0:
             case 0:
                 List<String> newIndexPatterns = randomValueOtherThan(orig.indexPatterns(),
                 List<String> newIndexPatterns = randomValueOtherThan(orig.indexPatterns(),
                     () -> randomList(1, 4, () -> randomAlphaOfLength(4)));
                     () -> randomList(1, 4, () -> randomAlphaOfLength(4)));
                 return new IndexTemplateV2(newIndexPatterns, orig.template(), orig.composedOf(),
                 return new IndexTemplateV2(newIndexPatterns, orig.template(), orig.composedOf(),
-                    orig.priority(), orig.version(), orig.metadata());
+                    orig.priority(), orig.version(), orig.metadata(), orig.getDataStreamTemplate());
             case 1:
             case 1:
                 return new IndexTemplateV2(orig.indexPatterns(),
                 return new IndexTemplateV2(orig.indexPatterns(),
                     randomValueOtherThan(orig.template(), () -> new Template(randomSettings(), randomMappings(), randomAliases())),
                     randomValueOtherThan(orig.template(), () -> new Template(randomSettings(), randomMappings(), randomAliases())),
                     orig.composedOf(),
                     orig.composedOf(),
                     orig.priority(),
                     orig.priority(),
                     orig.version(),
                     orig.version(),
-                    orig.metadata());
+                    orig.metadata(),
+                    orig.getDataStreamTemplate());
             case 2:
             case 2:
                 List<String> newComposedOf = randomValueOtherThan(orig.composedOf(),
                 List<String> newComposedOf = randomValueOtherThan(orig.composedOf(),
                     () -> randomList(0, 10, () -> randomAlphaOfLength(5)));
                     () -> randomList(0, 10, () -> randomAlphaOfLength(5)));
@@ -164,28 +176,40 @@ public class IndexTemplateV2Tests extends AbstractDiffableSerializationTestCase<
                     newComposedOf,
                     newComposedOf,
                     orig.priority(),
                     orig.priority(),
                     orig.version(),
                     orig.version(),
-                    orig.metadata());
+                    orig.metadata(),
+                    orig.getDataStreamTemplate());
             case 3:
             case 3:
                 return new IndexTemplateV2(orig.indexPatterns(),
                 return new IndexTemplateV2(orig.indexPatterns(),
                     orig.template(),
                     orig.template(),
                     orig.composedOf(),
                     orig.composedOf(),
                     randomValueOtherThan(orig.priority(), ESTestCase::randomNonNegativeLong),
                     randomValueOtherThan(orig.priority(), ESTestCase::randomNonNegativeLong),
                     orig.version(),
                     orig.version(),
-                    orig.metadata());
+                    orig.metadata(),
+                    orig.getDataStreamTemplate());
             case 4:
             case 4:
                 return new IndexTemplateV2(orig.indexPatterns(),
                 return new IndexTemplateV2(orig.indexPatterns(),
                     orig.template(),
                     orig.template(),
                     orig.composedOf(),
                     orig.composedOf(),
                     orig.priority(),
                     orig.priority(),
                     randomValueOtherThan(orig.version(), ESTestCase::randomNonNegativeLong),
                     randomValueOtherThan(orig.version(), ESTestCase::randomNonNegativeLong),
-                    orig.metadata());
+                    orig.metadata(),
+                    orig.getDataStreamTemplate());
             case 5:
             case 5:
                 return new IndexTemplateV2(orig.indexPatterns(),
                 return new IndexTemplateV2(orig.indexPatterns(),
                     orig.template(),
                     orig.template(),
                     orig.composedOf(),
                     orig.composedOf(),
                     orig.priority(),
                     orig.priority(),
                     orig.version(),
                     orig.version(),
-                    randomValueOtherThan(orig.metadata(), IndexTemplateV2Tests::randomMeta));
+                    randomValueOtherThan(orig.metadata(), IndexTemplateV2Tests::randomMeta),
+                    orig.getDataStreamTemplate());
+            case 6:
+                return new IndexTemplateV2(orig.indexPatterns(),
+                    orig.template(),
+                    orig.composedOf(),
+                    orig.priority(),
+                    orig.version(),
+                    orig.metadata(),
+                    randomValueOtherThan(orig.getDataStreamTemplate(), IndexTemplateV2Tests::randomDataStreamTemplate));
             default:
             default:
                 throw new IllegalStateException("illegal randomization branch");
                 throw new IllegalStateException("illegal randomization branch");
         }
         }

+ 128 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java

@@ -0,0 +1,128 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.metadata;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.cluster.DataStreamTestHelper.createFirstBackingIndex;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MetadataCreateDataStreamServiceTests extends ESTestCase {
+
+    public void testCreateDataStream() throws Exception {
+        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
+        final String dataStreamName = "my-data-stream";
+        ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
+        CreateDataStreamClusterStateUpdateRequest req =
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
+        ClusterState newState = MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req);
+        assertThat(newState.metadata().dataStreams().size(), equalTo(1));
+        assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
+        assertThat(newState.metadata().index(DataStream.getBackingIndexName(dataStreamName, 1)), notNullValue());
+        assertThat(newState.metadata().index(DataStream.getBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"),
+            equalTo("true"));
+    }
+
+    public void testCreateDuplicateDataStream() throws Exception {
+        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
+        final String dataStreamName = "my-data-stream";
+        IndexMetadata idx = createFirstBackingIndex(dataStreamName).build();
+        DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", List.of(idx.getIndex()));
+        ClusterState cs = ClusterState.builder(new ClusterName("_name"))
+            .metadata(Metadata.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build();
+        CreateDataStreamClusterStateUpdateRequest req =
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
+
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
+            () -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
+        assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists"));
+    }
+
+    public void testCreateDataStreamWithInvalidName() throws Exception {
+        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
+        final String dataStreamName = "_My-da#ta- ,stream-";
+        ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
+        CreateDataStreamClusterStateUpdateRequest req =
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
+            () -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
+        assertThat(e.getMessage(), containsString("must not contain the following characters"));
+    }
+
+    public void testCreateDataStreamWithUppercaseCharacters() throws Exception {
+        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
+        final String dataStreamName = "MAY_NOT_USE_UPPERCASE";
+        ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
+        CreateDataStreamClusterStateUpdateRequest req =
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
+            () -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
+        assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must be lowercase"));
+    }
+
+    public void testCreateDataStreamStartingWithPeriod() throws Exception {
+        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
+        final String dataStreamName = ".may_not_start_with_period";
+        ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
+        CreateDataStreamClusterStateUpdateRequest req =
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
+            () -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
+        assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must not start with '.'"));
+    }
+
+    private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
+        MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
+        when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))
+            .thenAnswer(mockInvocation -> {
+                ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0];
+                CreateIndexClusterStateUpdateRequest request = (CreateIndexClusterStateUpdateRequest) mockInvocation.getArguments()[1];
+
+                Metadata.Builder b = Metadata.builder(currentState.metadata())
+                    .put(IndexMetadata.builder(request.index())
+                        .settings(Settings.builder()
+                            .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+                            .put(request.settings())
+                            .build())
+                        .numberOfShards(1)
+                        .numberOfReplicas(1)
+                        .build(), false);
+                return ClusterState.builder(currentState).metadata(b.build()).build();
+            });
+
+        return s;
+    }
+
+}

+ 2 - 2
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java

@@ -999,7 +999,7 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
         boolean shouldBeText = randomBoolean();
         boolean shouldBeText = randomBoolean();
         List<String> composedOf = shouldBeText ? Arrays.asList("ct2", "ct1") : Arrays.asList("ct1", "ct2");
         List<String> composedOf = shouldBeText ? Arrays.asList("ct2", "ct1") : Arrays.asList("ct1", "ct2");
         logger.info("--> the {} analyzer should win ({})", shouldBeText ? "text" : "keyword", composedOf);
         logger.info("--> the {} analyzer should win ({})", shouldBeText ? "text" : "keyword", composedOf);
-        IndexTemplateV2 template = new IndexTemplateV2(Collections.singletonList("index"), null, composedOf, null, null, null);
+        IndexTemplateV2 template = new IndexTemplateV2(Collections.singletonList("index"), null, composedOf, null, null, null, null);
 
 
         ClusterState state = ClusterState.builder(ClusterState.EMPTY_STATE)
         ClusterState state = ClusterState.builder(ClusterState.EMPTY_STATE)
             .metadata(Metadata.builder(Metadata.EMPTY_METADATA)
             .metadata(Metadata.builder(Metadata.EMPTY_METADATA)
@@ -1058,7 +1058,7 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
         ComponentTemplate ct2 = new ComponentTemplate(ctt2, null, null);
         ComponentTemplate ct2 = new ComponentTemplate(ctt2, null, null);
 
 
         IndexTemplateV2 template = new IndexTemplateV2(Collections.singletonList("index"), null, Arrays.asList("ct2", "ct1"),
         IndexTemplateV2 template = new IndexTemplateV2(Collections.singletonList("index"), null, Arrays.asList("ct2", "ct1"),
-            null, null, null);
+            null, null, null, null);
 
 
         ClusterState state = ClusterState.builder(ClusterState.EMPTY_STATE)
         ClusterState state = ClusterState.builder(ClusterState.EMPTY_STATE)
             .metadata(Metadata.builder(Metadata.EMPTY_METADATA)
             .metadata(Metadata.builder(Metadata.EMPTY_METADATA)

+ 24 - 24
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java

@@ -308,15 +308,15 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
         assertNotNull(state.metadata().componentTemplates().get("foo"));
         assertNotNull(state.metadata().componentTemplates().get("foo"));
 
 
         IndexTemplateV2 firstGlobalIndexTemplate =
         IndexTemplateV2 firstGlobalIndexTemplate =
-            new IndexTemplateV2(List.of("*"), template, List.of("foo"), 1L, null, null);
+            new IndexTemplateV2(List.of("*"), template, List.of("foo"), 1L, null, null, null);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, true, "globalindextemplate1", firstGlobalIndexTemplate);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, true, "globalindextemplate1", firstGlobalIndexTemplate);
 
 
         IndexTemplateV2 secondGlobalIndexTemplate =
         IndexTemplateV2 secondGlobalIndexTemplate =
-            new IndexTemplateV2(List.of("*"), template, List.of("foo"), 2L, null, null);
+            new IndexTemplateV2(List.of("*"), template, List.of("foo"), 2L, null, null, null);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, true, "globalindextemplate2", secondGlobalIndexTemplate);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, true, "globalindextemplate2", secondGlobalIndexTemplate);
 
 
         IndexTemplateV2 fooPatternIndexTemplate =
         IndexTemplateV2 fooPatternIndexTemplate =
-            new IndexTemplateV2(List.of("foo-*"), template, List.of("foo"), 3L, null, null);
+            new IndexTemplateV2(List.of("foo-*"), template, List.of("foo"), 3L, null, null, null);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, true, "foopatternindextemplate", fooPatternIndexTemplate);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, true, "foopatternindextemplate", fooPatternIndexTemplate);
 
 
         // update the component template to set the index.hidden setting
         // update the component template to set the index.hidden setting
@@ -368,7 +368,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
         List<String> patterns = new ArrayList<>(template.indexPatterns());
         List<String> patterns = new ArrayList<>(template.indexPatterns());
         patterns.add("new-pattern");
         patterns.add("new-pattern");
         template = new IndexTemplateV2(patterns, template.template(), template.composedOf(), template.priority(), template.version(),
         template = new IndexTemplateV2(patterns, template.template(), template.composedOf(), template.priority(), template.version(),
-            template.metadata());
+            template.metadata(), null);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, false, "foo", template);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, false, "foo", template);
 
 
         assertNotNull(state.metadata().templatesV2().get("foo"));
         assertNotNull(state.metadata().templatesV2().get("foo"));
@@ -405,7 +405,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
                 .build())
                 .build())
             .build();
             .build();
 
 
-        IndexTemplateV2 v2Template = new IndexTemplateV2(Arrays.asList("foo-bar-*", "eggplant"), null, null, null, null, null);
+        IndexTemplateV2 v2Template = new IndexTemplateV2(Arrays.asList("foo-bar-*", "eggplant"), null, null, null, null, null, null);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, false, "v2-template", v2Template);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, false, "v2-template", v2Template);
 
 
         assertWarnings("index template [v2-template] has index patterns [foo-bar-*, eggplant] matching patterns " +
         assertWarnings("index template [v2-template] has index patterns [foo-bar-*, eggplant] matching patterns " +
@@ -442,7 +442,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
         waitToCreateComponentTemplate.await(10, TimeUnit.SECONDS);
         waitToCreateComponentTemplate.await(10, TimeUnit.SECONDS);
 
 
         IndexTemplateV2 globalIndexTemplate = new IndexTemplateV2(List.of("*"), null, List.of("ct-with-index-hidden-setting"), null, null,
         IndexTemplateV2 globalIndexTemplate = new IndexTemplateV2(List.of("*"), null, List.of("ct-with-index-hidden-setting"), null, null,
-            null);
+            null, null);
 
 
         expectThrows(InvalidIndexTemplateException.class, () ->
         expectThrows(InvalidIndexTemplateException.class, () ->
             metadataIndexTemplateService.putIndexTemplateV2("testing", true, "template-referencing-ct-with-hidden-index-setting",
             metadataIndexTemplateService.putIndexTemplateV2("testing", true, "template-referencing-ct-with-hidden-index-setting",
@@ -466,7 +466,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
      */
      */
     public void testPuttingV1StarTemplateGeneratesWarning() throws Exception {
     public void testPuttingV1StarTemplateGeneratesWarning() throws Exception {
         final MetadataIndexTemplateService metadataIndexTemplateService = getMetadataIndexTemplateService();
         final MetadataIndexTemplateService metadataIndexTemplateService = getMetadataIndexTemplateService();
-        IndexTemplateV2 v2Template = new IndexTemplateV2(Arrays.asList("foo-bar-*", "eggplant"), null, null, null, null, null);
+        IndexTemplateV2 v2Template = new IndexTemplateV2(Arrays.asList("foo-bar-*", "eggplant"), null, null, null, null, null, null);
         ClusterState state = metadataIndexTemplateService.addIndexTemplateV2(ClusterState.EMPTY_STATE, false, "v2-template", v2Template);
         ClusterState state = metadataIndexTemplateService.addIndexTemplateV2(ClusterState.EMPTY_STATE, false, "v2-template", v2Template);
 
 
         MetadataIndexTemplateService.PutRequest req = new MetadataIndexTemplateService.PutRequest("cause", "v1-template");
         MetadataIndexTemplateService.PutRequest req = new MetadataIndexTemplateService.PutRequest("cause", "v1-template");
@@ -486,7 +486,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
      */
      */
     public void testPuttingV1NonStarTemplateGeneratesError() throws Exception {
     public void testPuttingV1NonStarTemplateGeneratesError() throws Exception {
         final MetadataIndexTemplateService metadataIndexTemplateService = getMetadataIndexTemplateService();
         final MetadataIndexTemplateService metadataIndexTemplateService = getMetadataIndexTemplateService();
-        IndexTemplateV2 v2Template = new IndexTemplateV2(Arrays.asList("foo-bar-*", "eggplant"), null, null, null, null, null);
+        IndexTemplateV2 v2Template = new IndexTemplateV2(Arrays.asList("foo-bar-*", "eggplant"), null, null, null, null, null, null);
         ClusterState state = metadataIndexTemplateService.addIndexTemplateV2(ClusterState.EMPTY_STATE, false, "v2-template", v2Template);
         ClusterState state = metadataIndexTemplateService.addIndexTemplateV2(ClusterState.EMPTY_STATE, false, "v2-template", v2Template);
 
 
         MetadataIndexTemplateService.PutRequest req = new MetadataIndexTemplateService.PutRequest("cause", "v1-template");
         MetadataIndexTemplateService.PutRequest req = new MetadataIndexTemplateService.PutRequest("cause", "v1-template");
@@ -519,7 +519,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
                 .build())
                 .build())
             .build();
             .build();
 
 
-        IndexTemplateV2 v2Template = new IndexTemplateV2(Arrays.asList("foo-bar-*", "eggplant"), null, null, null, null, null);
+        IndexTemplateV2 v2Template = new IndexTemplateV2(Arrays.asList("foo-bar-*", "eggplant"), null, null, null, null, null, null);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, false, "v2-template", v2Template);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, false, "v2-template", v2Template);
 
 
         assertWarnings("index template [v2-template] has index patterns [foo-bar-*, eggplant] matching patterns " +
         assertWarnings("index template [v2-template] has index patterns [foo-bar-*, eggplant] matching patterns " +
@@ -559,7 +559,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
                 .build())
                 .build())
             .build();
             .build();
 
 
-        IndexTemplateV2 v2Template = new IndexTemplateV2(Arrays.asList("foo-bar-*", "eggplant"), null, null, null, null, null);
+        IndexTemplateV2 v2Template = new IndexTemplateV2(Arrays.asList("foo-bar-*", "eggplant"), null, null, null, null, null, null);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, false, "v2-template", v2Template);
         state = metadataIndexTemplateService.addIndexTemplateV2(state, false, "v2-template", v2Template);
 
 
         assertWarnings("index template [v2-template] has index patterns [foo-bar-*, eggplant] matching patterns " +
         assertWarnings("index template [v2-template] has index patterns [foo-bar-*, eggplant] matching patterns " +
@@ -584,10 +584,10 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
 
 
     public void testPuttingOverlappingV2Template() throws Exception {
     public void testPuttingOverlappingV2Template() throws Exception {
         {
         {
-            IndexTemplateV2 template = new IndexTemplateV2(Arrays.asList("egg*", "baz"), null, null, 1L, null, null);
+            IndexTemplateV2 template = new IndexTemplateV2(Arrays.asList("egg*", "baz"), null, null, 1L, null, null, null);
             MetadataIndexTemplateService metadataIndexTemplateService = getMetadataIndexTemplateService();
             MetadataIndexTemplateService metadataIndexTemplateService = getMetadataIndexTemplateService();
             ClusterState state = metadataIndexTemplateService.addIndexTemplateV2(ClusterState.EMPTY_STATE, false, "foo", template);
             ClusterState state = metadataIndexTemplateService.addIndexTemplateV2(ClusterState.EMPTY_STATE, false, "foo", template);
-            IndexTemplateV2 newTemplate = new IndexTemplateV2(Arrays.asList("abc", "baz*"), null, null, 1L, null, null);
+            IndexTemplateV2 newTemplate = new IndexTemplateV2(Arrays.asList("abc", "baz*"), null, null, 1L, null, null, null);
             IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
             IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
                 () -> metadataIndexTemplateService.addIndexTemplateV2(state, false, "foo2", newTemplate));
                 () -> metadataIndexTemplateService.addIndexTemplateV2(state, false, "foo2", newTemplate));
             assertThat(e.getMessage(), equalTo("index template [foo2] has index patterns [abc, baz*] matching patterns from existing " +
             assertThat(e.getMessage(), equalTo("index template [foo2] has index patterns [abc, baz*] matching patterns from existing " +
@@ -596,10 +596,10 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
         }
         }
 
 
         {
         {
-            IndexTemplateV2 template = new IndexTemplateV2(Arrays.asList("egg*", "baz"), null, null, null, null, null);
+            IndexTemplateV2 template = new IndexTemplateV2(Arrays.asList("egg*", "baz"), null, null, null, null, null, null);
             MetadataIndexTemplateService metadataIndexTemplateService = getMetadataIndexTemplateService();
             MetadataIndexTemplateService metadataIndexTemplateService = getMetadataIndexTemplateService();
             ClusterState state = metadataIndexTemplateService.addIndexTemplateV2(ClusterState.EMPTY_STATE, false, "foo", template);
             ClusterState state = metadataIndexTemplateService.addIndexTemplateV2(ClusterState.EMPTY_STATE, false, "foo", template);
-            IndexTemplateV2 newTemplate = new IndexTemplateV2(Arrays.asList("abc", "baz*"), null, null, 0L, null, null);
+            IndexTemplateV2 newTemplate = new IndexTemplateV2(Arrays.asList("abc", "baz*"), null, null, 0L, null, null, null);
             IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
             IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
                 () -> metadataIndexTemplateService.addIndexTemplateV2(state, false, "foo2", newTemplate));
                 () -> metadataIndexTemplateService.addIndexTemplateV2(state, false, "foo2", newTemplate));
             assertThat(e.getMessage(), equalTo("index template [foo2] has index patterns [abc, baz*] matching patterns from existing " +
             assertThat(e.getMessage(), equalTo("index template [foo2] has index patterns [abc, baz*] matching patterns from existing " +
@@ -615,9 +615,9 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
 
 
         ComponentTemplate ct = ComponentTemplateTests.randomInstance();
         ComponentTemplate ct = ComponentTemplateTests.randomInstance();
         state = service.addComponentTemplate(state, true, "ct", ct);
         state = service.addComponentTemplate(state, true, "ct", ct);
-        IndexTemplateV2 it = new IndexTemplateV2(List.of("i*"), null, List.of("ct"), null, 1L, null);
+        IndexTemplateV2 it = new IndexTemplateV2(List.of("i*"), null, List.of("ct"), null, 1L, null, null);
         state = service.addIndexTemplateV2(state, true, "my-template", it);
         state = service.addIndexTemplateV2(state, true, "my-template", it);
-        IndexTemplateV2 it2 = new IndexTemplateV2(List.of("in*"), null, List.of("ct"), 10L, 2L, null);
+        IndexTemplateV2 it2 = new IndexTemplateV2(List.of("in*"), null, List.of("ct"), 10L, 2L, null, null);
         state = service.addIndexTemplateV2(state, true, "my-template2", it2);
         state = service.addIndexTemplateV2(state, true, "my-template2", it2);
 
 
         String result = MetadataIndexTemplateService.findV2Template(state.metadata(), "index", randomBoolean());
         String result = MetadataIndexTemplateService.findV2Template(state.metadata(), "index", randomBoolean());
@@ -632,9 +632,9 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
 
 
         ComponentTemplate ct = ComponentTemplateTests.randomInstance();
         ComponentTemplate ct = ComponentTemplateTests.randomInstance();
         state = service.addComponentTemplate(state, true, "ct", ct);
         state = service.addComponentTemplate(state, true, "ct", ct);
-        IndexTemplateV2 it = new IndexTemplateV2(List.of("i*"), null, List.of("ct"), 0L, 1L, null);
+        IndexTemplateV2 it = new IndexTemplateV2(List.of("i*"), null, List.of("ct"), 0L, 1L, null, null);
         state = service.addIndexTemplateV2(state, true, "my-template", it);
         state = service.addIndexTemplateV2(state, true, "my-template", it);
-        IndexTemplateV2 it2 = new IndexTemplateV2(List.of("*"), null, List.of("ct"), 10L, 2L, null);
+        IndexTemplateV2 it2 = new IndexTemplateV2(List.of("*"), null, List.of("ct"), 10L, 2L, null, null);
         state = service.addIndexTemplateV2(state, true, "my-template2", it2);
         state = service.addIndexTemplateV2(state, true, "my-template2", it2);
 
 
         String result = MetadataIndexTemplateService.findV2Template(state.metadata(), "index", true);
         String result = MetadataIndexTemplateService.findV2Template(state.metadata(), "index", true);
@@ -647,7 +647,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
         try {
         try {
             // add an invalid global template that specifies the `index.hidden` setting
             // add an invalid global template that specifies the `index.hidden` setting
             IndexTemplateV2 invalidGlobalTemplate = new IndexTemplateV2(List.of("*"), templateWithHiddenSetting, List.of("ct"), 5L, 1L,
             IndexTemplateV2 invalidGlobalTemplate = new IndexTemplateV2(List.of("*"), templateWithHiddenSetting, List.of("ct"), 5L, 1L,
-                null);
+                null, null);
             Metadata invalidGlobalTemplateMetadata = Metadata.builder().putCustom(IndexTemplateV2Metadata.TYPE,
             Metadata invalidGlobalTemplateMetadata = Metadata.builder().putCustom(IndexTemplateV2Metadata.TYPE,
                 new IndexTemplateV2Metadata(Map.of("invalid_global_template", invalidGlobalTemplate))).build();
                 new IndexTemplateV2Metadata(Map.of("invalid_global_template", invalidGlobalTemplate))).build();
 
 
@@ -690,7 +690,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
                     "      }\n" +
                     "      }\n" +
                     "    }\n" +
                     "    }\n" +
                     "  }"), null),
                     "  }"), null),
-            List.of("ct_low", "ct_high"), 0L, 1L, null);
+            List.of("ct_low", "ct_high"), 0L, 1L, null, null);
         state = service.addIndexTemplateV2(state, true, "my-template", it);
         state = service.addIndexTemplateV2(state, true, "my-template", it);
 
 
         List<CompressedXContent> mappings = MetadataIndexTemplateService.resolveMappings(state, "my-template");
         List<CompressedXContent> mappings = MetadataIndexTemplateService.resolveMappings(state, "my-template");
@@ -743,7 +743,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
                 .put("index.blocks.write", false)
                 .put("index.blocks.write", false)
                 .put("index.number_of_shards", 3)
                 .put("index.number_of_shards", 3)
                 .build(), null, null),
                 .build(), null, null),
-            List.of("ct_low", "ct_high"), 0L, 1L, null);
+            List.of("ct_low", "ct_high"), 0L, 1L, null, null);
         state = service.addIndexTemplateV2(state, true, "my-template", it);
         state = service.addIndexTemplateV2(state, true, "my-template", it);
 
 
         Settings settings = MetadataIndexTemplateService.resolveSettings(state.metadata(), "my-template");
         Settings settings = MetadataIndexTemplateService.resolveSettings(state.metadata(), "my-template");
@@ -771,7 +771,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
         state = service.addComponentTemplate(state, true, "ct_low", ct2);
         state = service.addComponentTemplate(state, true, "ct_low", ct2);
         IndexTemplateV2 it = new IndexTemplateV2(List.of("i*"),
         IndexTemplateV2 it = new IndexTemplateV2(List.of("i*"),
             new Template(null, null, a3),
             new Template(null, null, a3),
-            List.of("ct_low", "ct_high"), 0L, 1L, null);
+            List.of("ct_low", "ct_high"), 0L, 1L, null, null);
         state = service.addIndexTemplateV2(state, true, "my-template", it);
         state = service.addIndexTemplateV2(state, true, "my-template", it);
 
 
         List<Map<String, AliasMetadata>> resolvedAliases = MetadataIndexTemplateService.resolveAliases(state.metadata(), "my-template");
         List<Map<String, AliasMetadata>> resolvedAliases = MetadataIndexTemplateService.resolveAliases(state.metadata(), "my-template");
@@ -856,9 +856,9 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     public static void assertTemplatesEqual(IndexTemplateV2 actual, IndexTemplateV2 expected) {
     public static void assertTemplatesEqual(IndexTemplateV2 actual, IndexTemplateV2 expected) {
         IndexTemplateV2 actualNoTemplate = new IndexTemplateV2(actual.indexPatterns(), null,
         IndexTemplateV2 actualNoTemplate = new IndexTemplateV2(actual.indexPatterns(), null,
-            actual.composedOf(), actual.priority(), actual.version(), actual.metadata());
+            actual.composedOf(), actual.priority(), actual.version(), actual.metadata(), actual.getDataStreamTemplate());
         IndexTemplateV2 expectedNoTemplate = new IndexTemplateV2(expected.indexPatterns(), null,
         IndexTemplateV2 expectedNoTemplate = new IndexTemplateV2(expected.indexPatterns(), null,
-            expected.composedOf(), expected.priority(), expected.version(), expected.metadata());
+            expected.composedOf(), expected.priority(), expected.version(), expected.metadata(), expected.getDataStreamTemplate());
 
 
         assertThat(actualNoTemplate, equalTo(expectedNoTemplate));
         assertThat(actualNoTemplate, equalTo(expectedNoTemplate));
         Template actualTemplate = actual.template();
         Template actualTemplate = actual.template();

+ 2 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java

@@ -77,7 +77,8 @@ public class ToAndFromJsonMetadataTests extends ESTestCase {
                     Collections.singletonList("component_template"),
                     Collections.singletonList("component_template"),
                     5L,
                     5L,
                     4L,
                     4L,
-                    Collections.singletonMap("my_meta", Collections.singletonMap("potato", "chicken"))))
+                    Collections.singletonMap("my_meta", Collections.singletonMap("potato", "chicken")),
+                    randomBoolean() ? null : new IndexTemplateV2.DataStreamTemplate("@timestamp")))
                 .put(IndexMetadata.builder("test12")
                 .put(IndexMetadata.builder("test12")
                         .settings(settings(Version.CURRENT)
                         .settings(settings(Version.CURRENT)
                                 .put("setting1", "value1")
                                 .put("setting1", "value1")