Browse Source

Ensure template exists when creating data stream (#56888)

Limit the creation of data streams only for namespaces that have a composable template with a data stream definition.

This way we ensure that mappings/settings have been specified and will be used at data stream creation and data stream rollover.

Also remove `timestamp_field` parameter from create data stream request and
let the create data stream api resolve the timestamp field
from the data stream definition snippet inside a composable template.

Relates to #53100
Martijn van Groningen 5 years ago
parent
commit
f8b090b641
22 changed files with 343 additions and 137 deletions
  1. 17 11
      docs/reference/indices/create-data-stream.asciidoc
  2. 14 2
      docs/reference/indices/delete-data-stream.asciidoc
  3. 8 2
      docs/reference/indices/get-data-stream.asciidoc
  4. 35 2
      docs/reference/indices/index-templates.asciidoc
  5. 14 3
      docs/reference/indices/rollover-index.asciidoc
  6. 1 2
      rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json
  7. 49 38
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml
  8. 13 4
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/20_unsupported_apis.yml
  9. 17 8
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.delete/20_backing_indices.yml
  10. 12 3
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.get/20_backing_indices.yml
  11. 13 4
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/50_data_streams.yml
  12. 2 5
      server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java
  13. 28 6
      server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java
  14. 1 1
      server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
  15. 11 14
      server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java
  16. 3 0
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java
  17. 16 4
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java
  18. 0 11
      server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java
  19. 4 7
      server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java
  20. 35 0
      server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java
  21. 37 6
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java
  22. 13 4
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_data_stream_resolvability.yml

+ 17 - 11
docs/reference/indices/create-data-stream.asciidoc

@@ -23,18 +23,32 @@ addressed directly, data streams are integrated with the
 <<index-lifecycle-management, index lifecycle management (ILM)>> to facilitate
 the management of the time series data contained in their backing indices.
 
+A data stream can only be created if the namespace it targets has a component
+template exists with a `data_stream` definition.
+
 [source,console]
---------------------------------------------------
-PUT _data_stream/my-data-stream
+-----------------------------------
+PUT _index_template/template
 {
-  "timestamp_field": "@timestamp"
+  "index_patterns": ["my-data-stream*"],
+  "data_stream": {
+    "timestamp_field": "@timestamp"
+  }
 }
+-----------------------------------
+// TEST
+
+[source,console]
+--------------------------------------------------
+PUT _data_stream/my-data-stream
 --------------------------------------------------
+// TEST[continued]
 
 ////
 [source,console]
 -----------------------------------
 DELETE /_data_stream/my-data-stream
+DELETE /_index_template/template
 -----------------------------------
 // TEST[continued]
 ////
@@ -71,11 +85,3 @@ Data stream names must meet the following criteria:
 will count towards the 255 limit faster)
 --
 
-[[indices-create-data-stream-api-request-body]]
-==== {api-request-body-title}
-
-`timestamp_field`::
-(Required, string) The name of the timestamp field. This field must be present
-in all documents indexed into the data stream and must be of type
-<<date, `date`>> or <<date_nanos, `date_nanos`>>.
-

+ 14 - 2
docs/reference/indices/delete-data-stream.asciidoc

@@ -9,10 +9,15 @@ Deletes an existing data stream along with its backing indices.
 ////
 [source,console]
 -----------------------------------
-PUT /_data_stream/my-data-stream
+PUT _index_template/template
 {
-  "timestamp_field" : "@timestamp"
+  "index_patterns": ["my-data-stream*"],
+  "data_stream": {
+    "timestamp_field": "@timestamp"
+  }
 }
+
+PUT /_data_stream/my-data-stream
 -----------------------------------
 // TESTSETUP
 ////
@@ -22,6 +27,13 @@ PUT /_data_stream/my-data-stream
 DELETE _data_stream/my-data-stream
 --------------------------------------------------
 
+////
+[source,console]
+-----------------------------------
+DELETE /_index_template/template
+-----------------------------------
+// TEST[continued]
+////
 
 [[delete-data-stream-api-request]]
 ==== {api-request-title}

+ 8 - 2
docs/reference/indices/get-data-stream.asciidoc

@@ -9,10 +9,15 @@ Returns information about one or more data streams.
 ////
 [source,console]
 -----------------------------------
-PUT /_data_stream/my-data-stream
+PUT _index_template/template
 {
-  "timestamp_field" : "@timestamp"
+  "index_patterns": ["my-data-stream*"],
+  "data_stream": {
+    "timestamp_field": "@timestamp"
+  }
 }
+
+PUT /_data_stream/my-data-stream
 -----------------------------------
 // TESTSETUP
 ////
@@ -21,6 +26,7 @@ PUT /_data_stream/my-data-stream
 [source,console]
 -----------------------------------
 DELETE /_data_stream/my-data-stream
+DELETE /_index_template/template
 -----------------------------------
 // TEARDOWN
 ////

+ 35 - 2
docs/reference/indices/index-templates.asciidoc

@@ -26,7 +26,7 @@ specify settings, mappings, and aliases.
 
 If a new index matches more than one index template, the index template with the highest priority is used.
 
-If an index is created with explicit settings and also matches an index template, 
+If an index is created with explicit settings and also matches an index template,
 the settings from the create index request take precedence over settings specified in the index template and its component templates.
 
 [source,console]
@@ -112,7 +112,7 @@ DELETE _component_template/*
 [[put-index-template-api-desc]]
 ==== {api-description-title}
 
-Creates or updates an index template. 
+Creates or updates an index template.
 
 // tag::index-template-def[]
 Index templates define <<index-modules-settings,settings>> and <<mapping,mappings>> that you can
@@ -555,3 +555,36 @@ PUT /_index_template/template_1
 --------------------------------------------------
 
 To check the `_meta`, you can use the <<indices-get-template, get index template>> API.
+
+[[data-stream-definition]]
+===== Data stream definition
+
+If a composable template should auto create a data stream instead of an index then
+a `data_stream` definition can be added to a composable template.
+
+[source,console]
+--------------------------------------------------
+PUT /_index_template/template_1
+{
+  "index_patterns": ["logs-*"],
+  "template": {
+    "mappings": {
+      "properties": {
+        "@timestamp": {
+          "type": "date"
+        }
+      }
+    }
+  },
+  "data_stream": {
+    "timestamp_field": "@timestamp"
+  }
+}
+--------------------------------------------------
+
+Required properties of a data stream definition:
+
+`timestamp_field`::
+(Required, string) The name of the timestamp field. This field must be present
+in all documents indexed into the data stream and must be of type
+<<date, `date`>> or <<date_nanos, `date_nanos`>>.

+ 14 - 3
docs/reference/indices/rollover-index.asciidoc

@@ -226,11 +226,20 @@ The API returns the following response:
 ===== Roll over a data stream
 
 [source,console]
---------------------------------------------------
-PUT /_data_stream/my-data-stream <1>
+-----------------------------------
+PUT _index_template/template
 {
-  "timestamp_field": "date"
+  "index_patterns": ["my-data-stream*"],
+  "data_stream": {
+    "timestamp_field": "@timestamp"
+  }
 }
+-----------------------------------
+// TEST
+
+[source,console]
+--------------------------------------------------
+PUT /_data_stream/my-data-stream <1>
 
 # Add > 1000 documents to my-data-stream
 
@@ -243,6 +252,7 @@ POST /my-data-stream/_rollover <2>
   }
 }
 --------------------------------------------------
+// TEST[continued]
 // TEST[setup:huge_twitter]
 // TEST[s/# Add > 1000 documents to my-data-stream/POST _reindex?refresh\n{"source":{"index":"twitter"},"dest":{"index":"my-data-stream-000001"}}/]
 <1> Creates a data stream called `my-data-stream` with one initial backing index
@@ -286,6 +296,7 @@ The API returns the following response:
 [source,console]
 -----------------------------------
 DELETE /_data_stream/my-data-stream
+DELETE /_index_template/*
 -----------------------------------
 // TEST[continued]
 ////

+ 1 - 2
rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json

@@ -24,8 +24,7 @@
     "params":{
     },
     "body":{
-      "description":"The data stream definition",
-      "required":true
+      "description":"The data stream definition"
     }
   }
 }

+ 49 - 38
rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml

@@ -1,21 +1,39 @@
+setup:
+  - skip:
+      features: allowed_warnings
+  - do:
+      allowed_warnings:
+        - "index template [my-template1] has index patterns [simple-data-stream1*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template1
+        body:
+          index_patterns: [simple-data-stream1]
+          data_stream:
+            timestamp_field: '@timestamp'
+  - do:
+      allowed_warnings:
+        - "index template [my-template2] has index patterns [simple-data-stream2*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template2
+        body:
+          index_patterns: [simple-data-stream2]
+          data_stream:
+            timestamp_field: '@timestamp2'
+
 ---
 "Create data stream":
   - skip:
-      version: " - 7.8.99"
-      reason:  "data streams only supported in 7.9+"
+      version: " - 7.99.99"
+      reason:  "mute bwc until backported"
 
   - do:
       indices.create_data_stream:
         name: simple-data-stream1
-        body:
-          timestamp_field: "@timestamp"
   - is_true: acknowledged
 
   - do:
       indices.create_data_stream:
         name: simple-data-stream2
-        body:
-          timestamp_field: "@timestamp2"
   - is_true: acknowledged
 
   - do:
@@ -62,62 +80,57 @@
 ---
 "Create data stream with invalid name":
   - skip:
-      version: " - 7.8.99"
-      reason: "data streams only supported in 7.9+"
+      version: " - 7.99.99"
+      reason:  "mute bwc until backported"
 
   - do:
       catch: bad_request
       indices.create_data_stream:
         name: invalid-data-stream#-name
-        body:
-          timestamp_field: "@timestamp"
 
   - match: { status: 400 }
   - match: { error.root_cause.0.type: "illegal_argument_exception" }
+  - match: { error.root_cause.0.reason: "data_stream [invalid-data-stream#-name] must not contain '#'" }
 
 ---
 "Get data stream":
   - skip:
-      version: " - 7.8.99"
-      reason:  "data streams only supported in 7.9+"
+      version: " - 7.99.99"
+      reason:  "mute bwc until backported"
 
   - do:
       indices.create_data_stream:
-        name: get-data-stream1
-        body:
-          timestamp_field: "@timestamp"
+        name: simple-data-stream1
   - is_true: acknowledged
 
   - do:
       indices.create_data_stream:
-        name: get-data-stream2
-        body:
-          timestamp_field: "@timestamp2"
+        name: simple-data-stream2
   - is_true: acknowledged
 
   - do:
       indices.get_data_stream: {}
-  - match: { 0.name: get-data-stream1 }
+  - match: { 0.name: simple-data-stream1 }
   - match: { 0.timestamp_field: '@timestamp' }
   - match: { 0.generation: 1 }
-  - match: { 1.name: get-data-stream2 }
+  - match: { 1.name: simple-data-stream2 }
   - match: { 1.timestamp_field: '@timestamp2' }
   - match: { 1.generation: 1 }
 
   - do:
       indices.get_data_stream:
-        name: get-data-stream1
-  - match: { 0.name: get-data-stream1 }
+        name: simple-data-stream1
+  - match: { 0.name: simple-data-stream1 }
   - match: { 0.timestamp_field: '@timestamp' }
   - match: { 0.generation: 1 }
 
   - do:
       indices.get_data_stream:
-        name: get-data-*
-  - match: { 0.name: get-data-stream1 }
+        name: simple-data-stream*
+  - match: { 0.name: simple-data-stream1 }
   - match: { 0.timestamp_field: '@timestamp' }
   - match: { 0.generation: 1 }
-  - match: { 1.name: get-data-stream2 }
+  - match: { 1.name: simple-data-stream2 }
   - match: { 1.timestamp_field: '@timestamp2' }
   - match: { 1.generation: 1 }
 
@@ -136,25 +149,23 @@
 
   - do:
       indices.delete_data_stream:
-        name: get-data-stream1
+        name: simple-data-stream1
   - is_true: acknowledged
 
   - do:
       indices.delete_data_stream:
-        name: get-data-stream2
+        name: simple-data-stream2
   - is_true: acknowledged
 
 ---
 "Delete data stream with backing indices":
   - skip:
-      version: " - 7.8.99"
-      reason:  "data streams only supported in 7.9+"
+      version: " - 7.99.99"
+      reason:  "mute bwc until backported"
 
   - do:
       indices.create_data_stream:
-        name: delete-data-stream1
-        body:
-          timestamp_field: "@timestamp"
+        name: simple-data-stream1
   - is_true: acknowledged
 
   - do:
@@ -167,25 +178,25 @@
 
   - do:
       indices.get:
-        index: ['delete-data-stream1-000001', 'test_index']
+        index: ['simple-data-stream1-000001', 'test_index']
 
   - is_true: test_index.settings
-  - is_true: delete-data-stream1-000001.settings
+  - is_true: simple-data-stream1-000001.settings
 
   - do:
       indices.get_data_stream: {}
-  - match: { 0.name: delete-data-stream1 }
+  - match: { 0.name: simple-data-stream1 }
   - match: { 0.timestamp_field: '@timestamp' }
   - match: { 0.generation: 1 }
   - length: { 0.indices: 1 }
-  - match: { 0.indices.0.index_name: 'delete-data-stream1-000001' }
+  - match: { 0.indices.0.index_name: 'simple-data-stream1-000001' }
 
   - do:
       indices.delete_data_stream:
-        name: delete-data-stream1
+        name: simple-data-stream1
   - is_true: acknowledged
 
   - do:
       catch: missing
       indices.get:
-        index: "delete-data-stream1-000001"
+        index: "simple-data-stream1-000001"

+ 13 - 4
rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/20_unsupported_apis.yml

@@ -1,14 +1,23 @@
 ---
 "Test apis that do not supported data streams":
   - skip:
-      version: " - 7.8.99"
-      reason: "data streams only supported in 7.9+"
+      version: " - 7.99.99"
+      reason: "mute bwc until backported"
+      features: allowed_warnings
+
+  - do:
+      allowed_warnings:
+        - "index template [my-template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template
+        body:
+          index_patterns: [logs-*]
+          data_stream:
+            timestamp_field: '@timestamp'
 
   - do:
       indices.create_data_stream:
         name: logs-foobar
-        body:
-          timestamp_field: "@timestamp"
   - is_true: acknowledged
 
   - do:

+ 17 - 8
rest-api-spec/src/main/resources/rest-api-spec/test/indices.delete/20_backing_indices.yml

@@ -1,14 +1,25 @@
+setup:
+  - skip:
+      features: allowed_warnings
+  - do:
+      allowed_warnings:
+        - "index template [my-template] has index patterns [simple-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template
+        body:
+          index_patterns: [simple-*]
+          data_stream:
+            timestamp_field: '@timestamp'
+
 ---
 "Delete backing index on data stream":
   - skip:
-      version: " - 7.8.99"
-      reason:  "data streams only supported in 7.9+"
+      version: " - 7.99.99"
+      reason:  "mute bwc until backported"
 
   - do:
       indices.create_data_stream:
         name: simple-data-stream
-        body:
-          timestamp_field: "@timestamp"
   - is_true: acknowledged
 
   # rollover data stream to create new backing index
@@ -55,14 +66,12 @@
 ---
 "Attempt to delete write index on data stream is rejected":
   - skip:
-      version: " - 7.8.99"
-      reason:  "data streams only supported in 7.9+"
+      version: " - 7.99.99"
+      reason:  "mute bwc until backported"
 
   - do:
       indices.create_data_stream:
         name: simple-data-stream
-        body:
-          timestamp_field: "@timestamp"
   - is_true: acknowledged
 
   # rollover data stream to create new backing index

+ 12 - 3
rest-api-spec/src/main/resources/rest-api-spec/test/indices.get/20_backing_indices.yml

@@ -1,14 +1,23 @@
 ---
 "Get backing indices for data stream":
   - skip:
-      version: " - 7.8.99"
+      version: " - 7.9.99"
       reason:  "data streams only supported in 7.9+"
+      features: allowed_warnings
+
+  - do:
+      allowed_warnings:
+        - "index template [my-template] has index patterns [data-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template
+        body:
+          index_patterns: [data-*]
+          data_stream:
+            timestamp_field: '@timestamp'
 
   - do:
       indices.create_data_stream:
         name: data-stream1
-        body:
-          timestamp_field: "@timestamp"
   - is_true: acknowledged
 
   - do:

+ 13 - 4
rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/50_data_streams.yml

@@ -1,14 +1,23 @@
 ---
 "Roll over a data stream":
   - skip:
-      version: " - 7.8.99"
-      reason:  "data streams only supported in 7.9+"
+      version: " - 7.99.99"
+      reason:  "mute bwc until backported"
+      features: allowed_warnings
+
+  - do:
+      allowed_warnings:
+        - "index template [my-template] has index patterns [data-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template
+        body:
+          index_patterns: [data-*]
+          data_stream:
+            timestamp_field: '@timestamp'
 
   - do:
       indices.create_data_stream:
         name: data-stream-for-rollover
-        body:
-          timestamp_field: "@timestamp"
   - is_true: acknowledged
 
   # rollover data stream to create new backing index

+ 2 - 5
server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java

@@ -39,7 +39,6 @@ import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
-import org.elasticsearch.cluster.metadata.Template;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
@@ -218,14 +217,12 @@ public class BulkIntegrationIT extends ESIntegTestCase {
         }
     }
 
-    public void testMixedAutoCreate() {
-        Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
-
+    public void testMixedAutoCreate() throws Exception {
         PutComposableIndexTemplateAction.Request createTemplateRequest = new PutComposableIndexTemplateAction.Request("logs-foo");
         createTemplateRequest.indexTemplate(
             new ComposableIndexTemplate(
                 List.of("logs-foo*"),
-                new Template(settings, null, null),
+                null,
                 null, null, null, null,
                 new ComposableIndexTemplate.DataStreamTemplate("@timestamp"))
         );

+ 28 - 6
server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java

@@ -28,6 +28,8 @@ import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
+import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
+import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder;
 import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -41,11 +43,14 @@ import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.test.ESIntegTestCase;
+import org.junit.After;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -72,13 +77,19 @@ import static org.hamcrest.Matchers.notNullValue;
 
 public class DataStreamIT extends ESIntegTestCase {
 
+    @After
+    public void deleteAllComposableTemplates() {
+        DeleteComposableIndexTemplateAction.Request deleteTemplateRequest = new DeleteComposableIndexTemplateAction.Request("*");
+        client().execute(DeleteComposableIndexTemplateAction.INSTANCE, deleteTemplateRequest).actionGet();
+    }
+
     public void testBasicScenario() throws Exception {
+        createIndexTemplate("id1", "metrics-foo*", "@timestamp1");
         CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo");
-        createDataStreamRequest.setTimestampFieldName("@timestamp1");
         client().admin().indices().createDataStream(createDataStreamRequest).get();
 
+        createIndexTemplate("id2", "metrics-bar*", "@timestamp2");
         createDataStreamRequest = new CreateDataStreamAction.Request("metrics-bar");
-        createDataStreamRequest.setTimestampFieldName("@timestamp2");
         client().admin().indices().createDataStream(createDataStreamRequest).get();
 
         GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("*");
@@ -151,9 +162,9 @@ public class DataStreamIT extends ESIntegTestCase {
     }
 
     public void testOtherWriteOps() throws Exception {
+        createIndexTemplate("id", "metrics-foobar*", "@timestamp1");
         String dataStreamName = "metrics-foobar";
         CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
-        createDataStreamRequest.setTimestampFieldName("@timestamp1");
         client().admin().indices().createDataStream(createDataStreamRequest).get();
 
         {
@@ -199,10 +210,10 @@ public class DataStreamIT extends ESIntegTestCase {
         }
     }
 
-    public void testResolvabilityOfDataStreamsInAPIs() {
+    public void testResolvabilityOfDataStreamsInAPIs() throws Exception {
+        createIndexTemplate("id", "logs-*", "ts");
         String dataStreamName = "logs-foobar";
         CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(dataStreamName);
-        request.setTimestampFieldName("ts");
         client().admin().indices().createDataStream(request).actionGet();
 
         verifyResolvability(dataStreamName, client().prepareIndex(dataStreamName)
@@ -230,7 +241,6 @@ public class DataStreamIT extends ESIntegTestCase {
         verifyResolvability(dataStreamName, client().prepareFieldCaps(dataStreamName).setFields("*"), false);
 
         request = new CreateDataStreamAction.Request("logs-barbaz");
-        request.setTimestampFieldName("ts");
         client().admin().indices().createDataStream(request).actionGet();
         verifyResolvability("logs-barbaz", client().prepareIndex("logs-barbaz")
                 .setSource("{}", XContentType.JSON)
@@ -325,4 +335,16 @@ public class DataStreamIT extends ESIntegTestCase {
             "] matches a data stream, specify the corresponding concrete indices instead."));
     }
 
+    static void createIndexTemplate(String id, String pattern, String timestampFieldName) throws IOException {
+        PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id);
+        request.indexTemplate(
+            new ComposableIndexTemplate(
+                List.of(pattern),
+                null,
+                null, null, null, null,
+                new ComposableIndexTemplate.DataStreamTemplate(timestampFieldName))
+        );
+        client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
+    }
+
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java

@@ -126,7 +126,7 @@ public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
                     DataStreamTemplate dataStreamTemplate = resolveAutoCreateDataStream(request, currentState.metadata());
                     if (dataStreamTemplate != null) {
                         CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest(
-                            request.index(), dataStreamTemplate.getTimestampField(), request.masterNodeTimeout(), request.timeout());
+                            request.index(), request.masterNodeTimeout(), request.timeout());
                         ClusterState clusterState =  metadataCreateDataStreamService.createDataStream(createRequest, currentState);
                         indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName());
                         return clusterState;

+ 11 - 14
server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.action.admin.indices.datastream;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionType;
@@ -56,39 +57,37 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
     public static class Request extends AcknowledgedRequest<Request> {
 
         private final String name;
-        private String timestampFieldName;
 
         public Request(String name) {
             this.name = name;
         }
 
-        public void setTimestampFieldName(String timestampFieldName) {
-            this.timestampFieldName = timestampFieldName;
-        }
-
         @Override
         public ActionRequestValidationException validate() {
             ActionRequestValidationException validationException = null;
             if (Strings.hasText(name) == false) {
                 validationException = ValidateActions.addValidationError("name is missing", validationException);
             }
-            if (Strings.hasText(timestampFieldName) == false) {
-                validationException = ValidateActions.addValidationError("timestamp field name is missing", validationException);
-            }
             return validationException;
         }
 
         public Request(StreamInput in) throws IOException {
             super(in);
             this.name = in.readString();
-            this.timestampFieldName = in.readString();
+            // TODO: remove when backported
+            if (in.getVersion().before(Version.V_8_0_0)) {
+                in.readString();
+            }
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             out.writeString(name);
-            out.writeString(timestampFieldName);
+            // TODO: remove when backported
+            if (out.getVersion().before(Version.V_8_0_0)) {
+                out.writeString("");
+            }
         }
 
         @Override
@@ -96,13 +95,12 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             Request request = (Request) o;
-            return name.equals(request.name) &&
-                timestampFieldName.equals(request.timestampFieldName);
+            return name.equals(request.name);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(name, timestampFieldName);
+            return Objects.hash(name);
         }
     }
 
@@ -133,7 +131,6 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
                                        ActionListener<AcknowledgedResponse> listener) throws Exception {
             CreateDataStreamClusterStateUpdateRequest updateRequest =  new CreateDataStreamClusterStateUpdateRequest(
                 request.name,
-                request.timestampFieldName,
                 request.masterNodeTimeout(),
                 request.timeout()
             );

+ 3 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

@@ -48,6 +48,7 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.ALIAS;
 import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.DATA_STREAM;
+import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream;
 import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findV1Templates;
 import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findV2Template;
 
@@ -140,6 +141,8 @@ public class MetadataRolloverService {
     private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstraction.DataStream dataStream, String dataStreamName,
                                               CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
                                               boolean silent) throws Exception {
+        lookupTemplateForDataStream(dataStreamName, currentState.metadata());
+
         final DataStream ds = dataStream.getDataStream();
         final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
         final String newWriteIndexName = DataStream.getBackingIndexName(ds.getName(), ds.getGeneration() + 1);

+ 16 - 4
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

@@ -101,14 +101,11 @@ public class MetadataCreateDataStreamService {
     public static final class CreateDataStreamClusterStateUpdateRequest extends ClusterStateUpdateRequest {
 
         private final String name;
-        private final String timestampFieldName;
 
         public CreateDataStreamClusterStateUpdateRequest(String name,
-                                                         String timestampFieldName,
                                                          TimeValue masterNodeTimeout,
                                                          TimeValue timeout) {
             this.name = name;
-            this.timestampFieldName = timestampFieldName;
             masterNodeTimeout(masterNodeTimeout);
             ackTimeout(timeout);
         }
@@ -131,6 +128,8 @@ public class MetadataCreateDataStreamService {
             throw new IllegalArgumentException("data_stream [" + request.name + "] must not start with '.'");
         }
 
+        ComposableIndexTemplate template = lookupTemplateForDataStream(request.name, currentState.metadata());
+
         String firstBackingIndexName = DataStream.getBackingIndexName(request.name, 1);
         CreateIndexClusterStateUpdateRequest createIndexRequest =
             new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
@@ -140,9 +139,22 @@ public class MetadataCreateDataStreamService {
         assert firstBackingIndex != null;
 
         Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(
-            new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex())));
+            new DataStream(request.name, template.getDataStreamTemplate().getTimestampField(), List.of(firstBackingIndex.getIndex())));
         logger.info("adding data stream [{}]", request.name);
         return ClusterState.builder(currentState).metadata(builder).build();
     }
 
+    public static ComposableIndexTemplate lookupTemplateForDataStream(String dataStreamName, Metadata metadata) {
+        final String v2Template = MetadataIndexTemplateService.findV2Template(metadata, dataStreamName, false);
+        if (v2Template == null) {
+            throw new IllegalArgumentException("no matching index template found for data stream [" + dataStreamName + "]");
+        }
+        ComposableIndexTemplate composableIndexTemplate = metadata.templatesV2().get(v2Template);
+        if (composableIndexTemplate.getDataStreamTemplate() == null) {
+            throw new IllegalArgumentException("matching index template [" + v2Template + "] for data stream [" + dataStreamName  +
+                "] has no data stream template");
+        }
+        return composableIndexTemplate;
+    }
+
 }

+ 0 - 11
server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java

@@ -20,14 +20,12 @@ package org.elasticsearch.rest.action.admin.indices;
 
 import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
 import org.elasticsearch.client.node.NodeClient;
-import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.action.RestToXContentListener;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
 public class RestCreateDataStreamAction extends BaseRestHandler {
 
@@ -46,15 +44,6 @@ public class RestCreateDataStreamAction extends BaseRestHandler {
     @Override
     protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
         CreateDataStreamAction.Request putDataStreamRequest = new CreateDataStreamAction.Request(request.param("name"));
-        request.withContentOrSourceParamParserOrNull(parser -> {
-            if (parser != null) {
-                Map<String, Object> body = parser.map();
-                String timeStampFieldName = (String) body.get(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName());
-                if (timeStampFieldName != null) {
-                    putDataStreamRequest.setTimestampFieldName(timeStampFieldName);
-                }
-            }
-        });
         return channel -> client.admin().indices().createDataStream(putDataStreamRequest, new RestToXContentListener<>(channel));
     }
 }

+ 4 - 7
server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java

@@ -35,24 +35,21 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCas
 
     @Override
     protected Request createTestInstance() {
-        Request request = new Request(randomAlphaOfLength(8));
-        request.setTimestampFieldName(randomAlphaOfLength(8));
-        return request;
+        return new Request(randomAlphaOfLength(8));
     }
 
     public void testValidateRequest() {
         CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream");
-        req.setTimestampFieldName("my-timestamp-field");
         ActionRequestValidationException e = req.validate();
         assertNull(e);
     }
 
-    public void testValidateRequestWithoutTimestampField() {
-        CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream");
+    public void testValidateRequestWithoutName() {
+        CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("");
         ActionRequestValidationException e = req.validate();
         assertNotNull(e);
         assertThat(e.validationErrors().size(), equalTo(1));
-        assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing"));
+        assertThat(e.validationErrors().get(0), containsString("name is missing"));
     }
 
 }

+ 35 - 0
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java

@@ -517,7 +517,10 @@ public class MetadataRolloverServiceTests extends ESTestCase {
 
     public void testRolloverClusterStateForDataStream() throws Exception {
         final DataStream dataStream = DataStreamTests.randomInstance();
+        ComposableIndexTemplate template = new ComposableIndexTemplate(List.of(dataStream.getName() + "*"), null, null, null, null, null,
+            new ComposableIndexTemplate.DataStreamTemplate("@timestamp"));
         Metadata.Builder builder = Metadata.builder();
+        builder.put("template", template);
         for (Index index : dataStream.getIndices()) {
             builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
         }
@@ -577,6 +580,38 @@ public class MetadataRolloverServiceTests extends ESTestCase {
         }
     }
 
+    public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception {
+        final DataStream dataStream = DataStreamTests.randomInstance();
+        Metadata.Builder builder = Metadata.builder();
+        for (Index index : dataStream.getIndices()) {
+            builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
+        }
+        builder.put(dataStream);
+        final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();
+
+        ThreadPool testThreadPool = mock(ThreadPool.class);
+        ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
+        Environment env = mock(Environment.class);
+        AllocationService allocationService = mock(AllocationService.class);
+        IndicesService indicesService = mockIndicesServices();
+        IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
+
+        MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(Settings.EMPTY,
+            clusterService, indicesService, allocationService, null, env, null, testThreadPool, null, Collections.emptyList(), false);
+        MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService,
+            new AliasValidator(), null, xContentRegistry());
+        MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService,
+            mockIndexNameExpressionResolver);
+
+        MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
+        List<Condition<?>> metConditions = Collections.singletonList(condition);
+        CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
+
+        Exception e = expectThrows(IllegalArgumentException.class, () -> rolloverService.rolloverClusterState(clusterState,
+            dataStream.getName(), null, createIndexRequest, metConditions, false));
+        assertThat(e.getMessage(), equalTo("no matching index template found for data stream [" + dataStream.getName() + "]"));
+    }
+
     private IndicesService mockIndicesServices() throws Exception {
         /*
          * Throws Exception because Eclipse uses the lower bound for

+ 37 - 6
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java

@@ -44,9 +44,13 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
     public void testCreateDataStream() throws Exception {
         final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
         final String dataStreamName = "my-data-stream";
-        ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
+        ComposableIndexTemplate template = new ComposableIndexTemplate(List.of(dataStreamName + "*"), null, null, null, null, null,
+            new ComposableIndexTemplate.DataStreamTemplate("@timestamp"));
+        ClusterState cs = ClusterState.builder(new ClusterName("_name"))
+            .metadata(Metadata.builder().put("template", template).build())
+            .build();
         CreateDataStreamClusterStateUpdateRequest req =
-            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
         ClusterState newState = MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req);
         assertThat(newState.metadata().dataStreams().size(), equalTo(1));
         assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
@@ -63,7 +67,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
         ClusterState cs = ClusterState.builder(new ClusterName("_name"))
             .metadata(Metadata.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build();
         CreateDataStreamClusterStateUpdateRequest req =
-            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
 
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
             () -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
@@ -75,7 +79,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
         final String dataStreamName = "_My-da#ta- ,stream-";
         ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
         CreateDataStreamClusterStateUpdateRequest req =
-            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
             () -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
         assertThat(e.getMessage(), containsString("must not contain the following characters"));
@@ -86,7 +90,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
         final String dataStreamName = "MAY_NOT_USE_UPPERCASE";
         ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
         CreateDataStreamClusterStateUpdateRequest req =
-            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
             () -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
         assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must be lowercase"));
@@ -97,12 +101,39 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
         final String dataStreamName = ".may_not_start_with_period";
         ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
         CreateDataStreamClusterStateUpdateRequest req =
-            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
             () -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
         assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must not start with '.'"));
     }
 
+    public void testCreateDataStreamNoTemplate() throws Exception {
+        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
+        final String dataStreamName = "my-data-stream";
+        ClusterState cs = ClusterState.builder(new ClusterName("_name"))
+            .build();
+        CreateDataStreamClusterStateUpdateRequest req =
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
+        Exception e = expectThrows(IllegalArgumentException.class,
+            () -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
+        assertThat(e.getMessage(), equalTo("no matching index template found for data stream [my-data-stream]"));
+    }
+
+    public void testCreateDataStreamNoValidTemplate() throws Exception {
+        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
+        final String dataStreamName = "my-data-stream";
+        ComposableIndexTemplate template = new ComposableIndexTemplate(List.of(dataStreamName + "*"), null, null, null, null, null, null);
+        ClusterState cs = ClusterState.builder(new ClusterName("_name"))
+            .metadata(Metadata.builder().put("template", template).build())
+            .build();
+        CreateDataStreamClusterStateUpdateRequest req =
+            new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
+        Exception e = expectThrows(IllegalArgumentException.class,
+            () -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
+        assertThat(e.getMessage(),
+            equalTo("matching index template [template] for data stream [my-data-stream] has no data stream template"));
+    }
+
     private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
         MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
         when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))

+ 13 - 4
x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_data_stream_resolvability.yml

@@ -1,14 +1,23 @@
 ---
 "Verify data stream resolvability for xpack apis":
   - skip:
-      version: " - 7.8.99"
-      reason: "data streams only supported in 7.9+"
+      version: " - 7.99.99"
+      reason: "mute bwc until backported"
+      features: allowed_warnings
+
+  - do:
+      allowed_warnings:
+        - "index template [my-template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template
+        body:
+          index_patterns: [logs-*]
+          data_stream:
+            timestamp_field: '@timestamp'
 
   - do:
       indices.create_data_stream:
         name: logs-foobar
-        body:
-          timestamp_field: "@timestamp"
   - is_true: acknowledged
 
   - do: