소스 검색

Keep track of timestamp_field mapping as part of a data stream (#58096)

Relates to #53100

* use mapping source direcly instead of using mapper service to extract the relevant mapping details
* moved assertion to TimestampField class and added helper method for tests
* Improved logic that inserts timestamp field mapping into an mapping.
If the timestamp field path consisted out of object fields and
if the final mapping did not contain the parent field then an error
occurred, because the prior logic assumed that the object field existed.
Martijn van Groningen 5 년 전
부모
커밋
085ba99fba
35개의 변경된 파일503개의 추가작업 그리고 119개의 파일을 삭제
  1. 5 17
      client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java
  2. 3 7
      client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java
  3. 13 8
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml
  4. 1 1
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/30_auto_create_data_stream.yml
  5. 1 1
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.delete/20_backing_indices.yml
  6. 1 1
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/50_data_streams.yml
  7. 155 14
      server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java
  8. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/DataStreamsSnapshotsIT.java
  9. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
  10. 1 1
      server/src/main/java/org/elasticsearch/action/admin/indices/resolve/ResolveIndexAction.java
  11. 124 9
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
  12. 34 6
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java
  13. 7 0
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java
  14. 2 1
      server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java
  15. 5 3
      server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java
  16. 2 1
      server/src/test/java/org/elasticsearch/action/admin/indices/resolve/ResolveIndexTests.java
  17. 1 1
      server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java
  18. 3 1
      server/src/test/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommandTests.java
  19. 72 5
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java
  20. 9 7
      server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java
  21. 3 1
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java
  22. 2 1
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesServiceTests.java
  23. 10 9
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java
  24. 5 4
      server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java
  25. 4 3
      server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java
  26. 6 0
      test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java
  27. 4 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckNoDataStreamWriteIndexStepTests.java
  28. 3 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DeleteStepTests.java
  29. 6 4
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ReplaceDataStreamBackingIndexStepTests.java
  30. 3 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java
  31. 3 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStepTests.java
  32. 3 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java
  33. 3 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java
  34. 2 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java
  35. 5 2
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java

+ 5 - 17
client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java

@@ -20,8 +20,6 @@ package org.elasticsearch.client.indices;
 
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
-import org.elasticsearch.common.xcontent.ToXContentObject;
-import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
 
 import java.io.IOException;
@@ -30,12 +28,12 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public final class DataStream implements ToXContentObject {
+public final class DataStream {
 
     private final String name;
     private final String timeStampField;
     private final List<String> indices;
-    private long generation;
+    private final long generation;
 
     public DataStream(String name, String timeStampField, List<String> indices, long generation) {
         this.name = name;
@@ -68,14 +66,15 @@ public final class DataStream implements ToXContentObject {
     @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
         args -> {
+            String timeStampField = (String) ((Map<?, ?>) args[1]).get("name");
             List<String> indices =
                 ((List<Map<String, String>>) args[2]).stream().map(m -> m.get("index_name")).collect(Collectors.toList());
-            return new DataStream((String) args[0], (String) args[1], indices, (Long) args[3]);
+            return new DataStream((String) args[0], timeStampField, indices, (Long) args[3]);
         });
 
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
-        PARSER.declareString(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD_FIELD);
+        PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.map(), TIMESTAMP_FIELD_FIELD);
         PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), INDICES_FIELD);
         PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
     }
@@ -84,17 +83,6 @@ public final class DataStream implements ToXContentObject {
         return PARSER.parse(parser, null);
     }
 
-    @Override
-    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-        builder.startObject();
-        builder.field(NAME_FIELD.getPreferredName(), name);
-        builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField);
-        builder.field(INDICES_FIELD.getPreferredName(), indices);
-        builder.field(GENERATION_FIELD.getPreferredName(), generation);
-        builder.endObject();
-        return builder;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;

+ 3 - 7
client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java

@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
 
 public class GetDataStreamResponseTests extends AbstractResponseTestCase<GetDataStreamAction.Response, GetDataStreamResponse> {
@@ -52,12 +53,7 @@ public class GetDataStreamResponseTests extends AbstractResponseTestCase<GetData
         long generation = indices.size() + randomLongBetween(1, 128);
         String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
         indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
-        return new DataStream(dataStreamName, randomAlphaOfLength(10), indices, generation);
-    }
-
-    private static GetDataStreamResponse fromXContent(XContentParser parser) throws IOException {
-        parser.nextToken();
-        return GetDataStreamResponse.fromXContent(parser);
+        return new DataStream(dataStreamName, createTimestampField(randomAlphaOfLength(10)), indices, generation);
     }
 
     @Override
@@ -86,7 +82,7 @@ public class GetDataStreamResponseTests extends AbstractResponseTestCase<GetData
             DataStream server = serverIt.next();
             assertEquals(server.getName(), client.getName());
             assertEquals(server.getIndices().stream().map(Index::getName).collect(Collectors.toList()), client.getIndices());
-            assertEquals(server.getTimeStampField(), client.getTimeStampField());
+            assertEquals(server.getTimeStampField().getName(), client.getTimeStampField());
             assertEquals(server.getGeneration(), client.getGeneration());
         }
     }

+ 13 - 8
rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml

@@ -50,12 +50,12 @@ setup:
       indices.get_data_stream:
         name: "*"
   - match: { 0.name: simple-data-stream1 }
-  - match: { 0.timestamp_field: '@timestamp' }
+  - match: { 0.timestamp_field.name: '@timestamp' }
   - match: { 0.generation: 1 }
   - length: { 0.indices: 1 }
   - match: { 0.indices.0.index_name: '.ds-simple-data-stream1-000001' }
   - match: { 1.name: simple-data-stream2 }
-  - match: { 1.timestamp_field: '@timestamp2' }
+  - match: { 1.timestamp_field.name: '@timestamp2' }
   - match: { 0.generation: 1 }
   - length: { 1.indices: 1 }
   - match: { 1.indices.0.index_name: '.ds-simple-data-stream2-000001' }
@@ -121,27 +121,32 @@ setup:
   - do:
       indices.get_data_stream: {}
   - match: { 0.name: simple-data-stream1 }
-  - match: { 0.timestamp_field: '@timestamp' }
+  - match: { 0.timestamp_field.name: '@timestamp' }
+  - match: { 0.timestamp_field.mapping: {type: date} }
   - match: { 0.generation: 1 }
   - match: { 1.name: simple-data-stream2 }
-  - match: { 1.timestamp_field: '@timestamp2' }
+  - match: { 1.timestamp_field.name: '@timestamp2' }
+  - match: { 1.timestamp_field.mapping: {type: date} }
   - match: { 1.generation: 1 }
 
   - do:
       indices.get_data_stream:
         name: simple-data-stream1
   - match: { 0.name: simple-data-stream1 }
-  - match: { 0.timestamp_field: '@timestamp' }
+  - match: { 0.timestamp_field.name: '@timestamp' }
+  - match: { 0.timestamp_field.mapping: {type: date} }
   - match: { 0.generation: 1 }
 
   - do:
       indices.get_data_stream:
         name: simple-data-stream*
   - match: { 0.name: simple-data-stream1 }
-  - match: { 0.timestamp_field: '@timestamp' }
+  - match: { 0.timestamp_field.name: '@timestamp' }
+  - match: { 0.timestamp_field.mapping: {type: date} }
   - match: { 0.generation: 1 }
   - match: { 1.name: simple-data-stream2 }
-  - match: { 1.timestamp_field: '@timestamp2' }
+  - match: { 1.timestamp_field.name: '@timestamp2' }
+  - match: { 1.timestamp_field.mapping: {type: date} }
   - match: { 1.generation: 1 }
 
   - do:
@@ -196,7 +201,7 @@ setup:
   - do:
       indices.get_data_stream: {}
   - match: { 0.name: simple-data-stream1 }
-  - match: { 0.timestamp_field: '@timestamp' }
+  - match: { 0.timestamp_field.name: '@timestamp' }
   - match: { 0.generation: 1 }
   - length: { 0.indices: 1 }
   - match: { 0.indices.0.index_name: '.ds-simple-data-stream1-000001' }

+ 1 - 1
rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/30_auto_create_data_stream.yml

@@ -41,7 +41,7 @@
       indices.get_data_stream:
         name: logs-foobar
   - match: { 0.name: logs-foobar }
-  - match: { 0.timestamp_field: 'timestamp' }
+  - match: { 0.timestamp_field.name: 'timestamp' }
   - length: { 0.indices: 1 }
   - match: { 0.indices.0.index_name: '.ds-logs-foobar-000001' }
 

+ 1 - 1
rest-api-spec/src/main/resources/rest-api-spec/test/indices.delete/20_backing_indices.yml

@@ -58,7 +58,7 @@ setup:
       indices.get_data_stream:
         name: "*"
   - match: { 0.name: simple-data-stream }
-  - match: { 0.timestamp_field: '@timestamp' }
+  - match: { 0.timestamp_field.name: '@timestamp' }
   - match: { 0.generation: 2 }
   - length: { 0.indices: 1 }
   - match: { 0.indices.0.index_name: '.ds-simple-data-stream-000002' }

+ 1 - 1
rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/50_data_streams.yml

@@ -46,7 +46,7 @@
       indices.get_data_stream:
         name: "*"
   - match: { 0.name: data-stream-for-rollover }
-  - match: { 0.timestamp_field: '@timestamp' }
+  - match: { 0.timestamp_field.name: '@timestamp' }
   - match: { 0.generation: 2 }
   - length: { 0.indices: 2 }
   - match: { 0.indices.0.index_name: '.ds-data-stream-for-rollover-000001' }

+ 155 - 14
server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java

@@ -64,6 +64,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
+import java.util.Map;
 
 import static org.elasticsearch.indices.IndicesOptionsIntegrationIT._flush;
 import static org.elasticsearch.indices.IndicesOptionsIntegrationIT.clearCache;
@@ -97,11 +98,11 @@ public class DataStreamIT extends ESIntegTestCase {
     }
 
     public void testBasicScenario() throws Exception {
-        createIndexTemplate("id1", "@timestamp1", "metrics-foo*");
+        putComposableIndexTemplate("id1", "@timestamp1", List.of("metrics-foo*"));
         CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo");
         client().admin().indices().createDataStream(createDataStreamRequest).get();
 
-        createIndexTemplate("id2", "@timestamp2", "metrics-bar*");
+        putComposableIndexTemplate("id2", "@timestamp2", List.of("metrics-bar*"));
         createDataStreamRequest = new CreateDataStreamAction.Request("metrics-bar");
         client().admin().indices().createDataStream(createDataStreamRequest).get();
 
@@ -110,12 +111,14 @@ public class DataStreamIT extends ESIntegTestCase {
         getDataStreamResponse.getDataStreams().sort(Comparator.comparing(DataStream::getName));
         assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(2));
         assertThat(getDataStreamResponse.getDataStreams().get(0).getName(), equalTo("metrics-bar"));
-        assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField(), equalTo("@timestamp2"));
+        assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getName(), equalTo("@timestamp2"));
+        assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getFieldMapping(), equalTo(Map.of("type", "date")));
         assertThat(getDataStreamResponse.getDataStreams().get(0).getIndices().size(), equalTo(1));
         assertThat(getDataStreamResponse.getDataStreams().get(0).getIndices().get(0).getName(),
             equalTo(DataStream.getDefaultBackingIndexName("metrics-bar", 1)));
         assertThat(getDataStreamResponse.getDataStreams().get(1).getName(), equalTo("metrics-foo"));
-        assertThat(getDataStreamResponse.getDataStreams().get(1).getTimeStampField(), equalTo("@timestamp1"));
+        assertThat(getDataStreamResponse.getDataStreams().get(1).getTimeStampField().getName(), equalTo("@timestamp1"));
+        assertThat(getDataStreamResponse.getDataStreams().get(1).getTimeStampField().getFieldMapping(), equalTo(Map.of("type", "date")));
         assertThat(getDataStreamResponse.getDataStreams().get(1).getIndices().size(), equalTo(1));
         assertThat(getDataStreamResponse.getDataStreams().get(1).getIndices().get(0).getName(),
             equalTo(DataStream.getDefaultBackingIndexName("metrics-foo", 1)));
@@ -125,11 +128,15 @@ public class DataStreamIT extends ESIntegTestCase {
             client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet();
         assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue());
         assertThat(getIndexResponse.getSettings().get(backingIndex).getAsBoolean("index.hidden", null), is(true));
+        Map<?, ?> mappings = getIndexResponse.getMappings().get(backingIndex).getSourceAsMap();
+        assertThat(ObjectPath.eval("properties.@timestamp2.type", mappings), is("date"));
 
         backingIndex = DataStream.getDefaultBackingIndexName("metrics-foo", 1);
         getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet();
         assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue());
         assertThat(getIndexResponse.getSettings().get(backingIndex).getAsBoolean("index.hidden", null), is(true));
+        mappings = getIndexResponse.getMappings().get(backingIndex).getSourceAsMap();
+        assertThat(ObjectPath.eval("properties.@timestamp1.type", mappings), is("date"));
 
         int numDocsBar = randomIntBetween(2, 16);
         indexDocs("metrics-bar", numDocsBar);
@@ -151,11 +158,15 @@ public class DataStreamIT extends ESIntegTestCase {
         getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet();
         assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue());
         assertThat(getIndexResponse.getSettings().get(backingIndex).getAsBoolean("index.hidden", null), is(true));
+        mappings = getIndexResponse.getMappings().get(backingIndex).getSourceAsMap();
+        assertThat(ObjectPath.eval("properties.@timestamp1.type", mappings), is("date"));
 
         backingIndex = DataStream.getDefaultBackingIndexName("metrics-bar", 2);
         getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet();
         assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue());
         assertThat(getIndexResponse.getSettings().get(backingIndex).getAsBoolean("index.hidden", null), is(true));
+        mappings = getIndexResponse.getMappings().get(backingIndex).getSourceAsMap();
+        assertThat(ObjectPath.eval("properties.@timestamp2.type", mappings), is("date"));
 
         int numDocsBar2 = randomIntBetween(2, 16);
         indexDocs("metrics-bar", numDocsBar2);
@@ -185,7 +196,7 @@ public class DataStreamIT extends ESIntegTestCase {
     }
 
     public void testOtherWriteOps() throws Exception {
-        createIndexTemplate("id", "@timestamp1", "metrics-foobar*");
+        putComposableIndexTemplate("id", "@timestamp1", List.of("metrics-foobar*"));
         String dataStreamName = "metrics-foobar";
         CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
         client().admin().indices().createDataStream(createDataStreamRequest).get();
@@ -271,7 +282,7 @@ public class DataStreamIT extends ESIntegTestCase {
         GetDataStreamAction.Response getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
         assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
         assertThat(getDataStreamResponse.getDataStreams().get(0).getName(), equalTo(dataStreamName));
-        assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField(), equalTo("@timestamp"));
+        assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getName(), equalTo("@timestamp"));
         assertThat(getDataStreamResponse.getDataStreams().get(0).getIndices().size(), equalTo(1));
         assertThat(getDataStreamResponse.getDataStreams().get(0).getIndices().get(0).getName(), equalTo(backingIndex));
 
@@ -351,7 +362,7 @@ public class DataStreamIT extends ESIntegTestCase {
     }
 
     public void testResolvabilityOfDataStreamsInAPIs() throws Exception {
-        createIndexTemplate("id", "ts", "logs-*");
+        putComposableIndexTemplate("id", "ts", List.of("logs-*"));
         String dataStreamName = "logs-foobar";
         CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(dataStreamName);
         client().admin().indices().createDataStream(request).actionGet();
@@ -412,7 +423,7 @@ public class DataStreamIT extends ESIntegTestCase {
     }
 
     public void testCannotDeleteComposableTemplateUsedByDataStream() throws Exception {
-        createIndexTemplate("id", "@timestamp1", "metrics-foobar*");
+        putComposableIndexTemplate("id", "@timestamp1", List.of("metrics-foobar*"));
         String dataStreamName = "metrics-foobar-baz";
         CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
         client().admin().indices().createDataStream(createDataStreamRequest).get();
@@ -435,7 +446,7 @@ public class DataStreamIT extends ESIntegTestCase {
     }
 
     public void testAliasActionsFailOnDataStreams() throws Exception {
-        createIndexTemplate("id1", "@timestamp1", "metrics-foo*");
+        putComposableIndexTemplate("id1", "@timestamp1", List.of("metrics-foo*"));
         String dataStreamName = "metrics-foo";
         CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
         client().admin().indices().createDataStream(createDataStreamRequest).get();
@@ -448,7 +459,7 @@ public class DataStreamIT extends ESIntegTestCase {
     }
 
     public void testAliasActionsFailOnDataStreamBackingIndices() throws Exception {
-        createIndexTemplate("id1", "@timestamp1", "metrics-foo*");
+        putComposableIndexTemplate("id1", "@timestamp1", List.of("metrics-foo*"));
         String dataStreamName = "metrics-foo";
         CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
         client().admin().indices().createDataStream(createDataStreamRequest).get();
@@ -464,6 +475,132 @@ public class DataStreamIT extends ESIntegTestCase {
             "support aliases."));
     }
 
+    public void testChangeTimestampFieldInComposableTemplatePriorToRollOver() throws Exception {
+        putComposableIndexTemplate("id1", "@timestamp", List.of("logs-foo*"));
+
+        // Index doc that triggers creation of a data stream
+        IndexRequest indexRequest = new IndexRequest("logs-foobar").source("{}", XContentType.JSON).opType("create");
+        IndexResponse indexResponse = client().index(indexRequest).actionGet();
+        assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1)));
+        assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 1), "properties.@timestamp");
+
+        // Rollover data stream
+        RolloverResponse rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("logs-foobar", null)).get();
+        assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 2)));
+        assertTrue(rolloverResponse.isRolledOver());
+        assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 2), "properties.@timestamp");
+
+        // Index another doc into a data stream
+        indexRequest = new IndexRequest("logs-foobar").source("{}", XContentType.JSON).opType("create");
+        indexResponse = client().index(indexRequest).actionGet();
+        assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 2)));
+
+        // Change the template to have a different timestamp field
+        putComposableIndexTemplate("id1", "@timestamp2", List.of("logs-foo*"));
+
+        // Rollover again, eventhough there is no mapping in the template, the timestamp field mapping in data stream
+        // should be applied in the new backing index
+        rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("logs-foobar", null)).get();
+        assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 3)));
+        assertTrue(rolloverResponse.isRolledOver());
+        assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 3), "properties.@timestamp");
+
+        // Index another doc into a data stream
+        indexRequest = new IndexRequest("logs-foobar").source("{}", XContentType.JSON).opType("create");
+        indexResponse = client().index(indexRequest).actionGet();
+        assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 3)));
+
+        DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request("logs-foobar");
+        client().admin().indices().deleteDataStream(deleteDataStreamRequest).actionGet();
+    }
+
+    public void testNestedTimestampField() throws Exception {
+        String mapping = "{\n" +
+            "      \"properties\": {\n" +
+            "        \"event\": {\n" +
+            "          \"properties\": {\n" +
+            "            \"@timestamp\": {\n" +
+            "              \"type\": \"date\"" +
+            "            }\n" +
+            "          }\n" +
+            "        }\n" +
+            "      }\n" +
+            "    }";;
+        putComposableIndexTemplate("id1", "event.@timestamp", mapping, List.of("logs-foo*"));
+
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("logs-foobar");
+        client().admin().indices().createDataStream(createDataStreamRequest).get();
+        GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("logs-foobar");
+        GetDataStreamAction.Response getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
+        assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
+        assertThat(getDataStreamResponse.getDataStreams().get(0).getName(), equalTo("logs-foobar"));
+        assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getName(), equalTo("event.@timestamp"));
+        assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getFieldMapping(), equalTo(Map.of("type", "date")));
+        assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 1), "properties.event.properties.@timestamp");
+
+        // Change the template to have a different timestamp field
+        putComposableIndexTemplate("id1", "@timestamp2", List.of("logs-foo*"));
+
+        RolloverResponse rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("logs-foobar", null)).actionGet();
+        assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 2)));
+        assertTrue(rolloverResponse.isRolledOver());
+        assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 2), "properties.event.properties.@timestamp");
+
+        DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request("logs-foobar");
+        client().admin().indices().deleteDataStream(deleteDataStreamRequest).actionGet();
+    }
+
+    public void testTimestampFieldCustomAttributes() throws Exception {
+        String mapping = "{\n" +
+            "      \"properties\": {\n" +
+            "        \"@timestamp\": {\n" +
+            "          \"type\": \"date\",\n" +
+            "          \"format\": \"yyyy-MM\",\n" +
+            "          \"meta\": {\n" +
+            "            \"x\": \"y\"\n" +
+            "          },\n" +
+            "          \"store\": true\n" +
+            "        }\n" +
+            "      }\n" +
+            "    }";
+        putComposableIndexTemplate("id1", "@timestamp", mapping, List.of("logs-foo*"));
+
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("logs-foobar");
+        client().admin().indices().createDataStream(createDataStreamRequest).get();
+        GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("logs-foobar");
+        GetDataStreamAction.Response getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
+        assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
+        assertThat(getDataStreamResponse.getDataStreams().get(0).getName(), equalTo("logs-foobar"));
+        assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getName(), equalTo("@timestamp"));
+        Map<?, ?> expectedTimestampMapping = Map.of("type", "date", "format", "yyyy-MM", "meta", Map.of("x", "y"), "store", true);
+        assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getFieldMapping(), equalTo(expectedTimestampMapping));
+        assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 1), "properties.@timestamp", expectedTimestampMapping);
+
+        // Change the template to have a different timestamp field
+        putComposableIndexTemplate("id1", "@timestamp2", List.of("logs-foo*"));
+
+        RolloverResponse rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("logs-foobar", null)).actionGet();
+        assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 2)));
+        assertTrue(rolloverResponse.isRolledOver());
+        assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 2), "properties.@timestamp", expectedTimestampMapping);
+
+        DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request("logs-foobar");
+        client().admin().indices().deleteDataStream(deleteDataStreamRequest).actionGet();
+    }
+
+    private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping) {
+        assertBackingIndex(backingIndex, timestampFieldPathInMapping, Map.of("type", "date"));
+    }
+
+    private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping, Map<?, ?> expectedMapping) {
+        GetIndexResponse getIndexResponse =
+            client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet();
+        assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue());
+        assertThat(getIndexResponse.getSettings().get(backingIndex).getAsBoolean("index.hidden", null), is(true));
+        Map<?, ?> mappings = getIndexResponse.getMappings().get(backingIndex).getSourceAsMap();
+        assertThat(ObjectPath.eval(timestampFieldPathInMapping, mappings), is(expectedMapping));
+    }
+
     private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) {
         verifyResolvability(dataStream, requestBuilder, fail, 0);
     }
@@ -538,13 +675,17 @@ public class DataStreamIT extends ESIntegTestCase {
             "] matches a data stream, specify the corresponding concrete indices instead."));
     }
 
-    public static void createIndexTemplate(String id, String timestampFieldName, String... pattern) throws IOException {
+    public static void putComposableIndexTemplate(String id, String timestampFieldName, List<String> patterns) throws IOException {
+        String mapping = MetadataCreateDataStreamServiceTests.generateMapping(timestampFieldName);
+        putComposableIndexTemplate(id, timestampFieldName, mapping, patterns);
+    }
+
+    static void putComposableIndexTemplate(String id, String timestampFieldName, String mapping, List<String> patterns) throws IOException {
         PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id);
         request.indexTemplate(
             new ComposableIndexTemplate(
-                Arrays.asList(pattern),
-                new Template(null,
-                    new CompressedXContent(MetadataCreateDataStreamServiceTests.generateMapping(timestampFieldName)), null),
+                patterns,
+                new Template(null, new CompressedXContent(mapping), null),
                 null, null, null, null,
                 new ComposableIndexTemplate.DataStreamTemplate(timestampFieldName))
         );

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/snapshots/DataStreamsSnapshotsIT.java

@@ -59,7 +59,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         Path location = randomRepoPath();
         createRepository(REPO, "fs", location);
 
-        DataStreamIT.createIndexTemplate("t1", "@timestamp", "ds", "other-ds");
+        DataStreamIT.putComposableIndexTemplate("t1", "@timestamp", List.of("ds", "other-ds"));
 
         CreateDataStreamAction.Request request = new CreateDataStreamAction.Request("ds");
         AcknowledgedResponse response = client.admin().indices().createDataStream(request).get();

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

@@ -2522,7 +2522,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
 
 
         String dataStream = "datastream";
-        DataStreamIT.createIndexTemplate("dst", "@timestamp", dataStream);
+        DataStreamIT.putComposableIndexTemplate("dst", "@timestamp", List.of(dataStream));
 
         logger.info("--> indexing some data");
         for (int i = 0; i < 100; i++) {

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/resolve/ResolveIndexAction.java

@@ -718,7 +718,7 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
                         dataStreams.add(new ResolvedDataStream(
                             dataStream.getName(),
                             backingIndices,
-                            dataStream.getDataStream().getTimeStampField()));
+                            dataStream.getDataStream().getTimeStampField().getName()));
                         break;
                     default:
                         throw new IllegalStateException("unknown index abstraction type: " + ia.getType());

+ 124 - 9
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -18,34 +18,42 @@
  */
 package org.elasticsearch.cluster.metadata;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.cluster.AbstractDiffable;
 import org.elasticsearch.cluster.Diff;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.index.Index;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 
+import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.ALLOWED_TIMESTAMPFIELD_TYPES;
+import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.convertFieldPathToMappingPath;
+
 public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
 
     public static final String BACKING_INDEX_PREFIX = ".ds-";
     public static final String DATA_STREAMS_METADATA_FIELD = "data-streams";
 
     private final String name;
-    private final String timeStampField;
+    private final TimestampField timeStampField;
     private final List<Index> indices;
-    private long generation;
+    private final long generation;
 
-    public DataStream(String name, String timeStampField, List<Index> indices, long generation) {
+    public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation) {
         this.name = name;
         this.timeStampField = timeStampField;
         this.indices = indices;
@@ -54,7 +62,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         assert indices.get(indices.size() - 1).getName().equals(getDefaultBackingIndexName(name, generation));
     }
 
-    public DataStream(String name, String timeStampField, List<Index> indices) {
+    public DataStream(String name, TimestampField timeStampField, List<Index> indices) {
         this(name, timeStampField, indices, indices.size());
     }
 
@@ -62,7 +70,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         return name;
     }
 
-    public String getTimeStampField() {
+    public TimestampField getTimeStampField() {
         return timeStampField;
     }
 
@@ -141,7 +149,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
     }
 
     public DataStream(StreamInput in) throws IOException {
-        this(in.readString(), in.readString(), in.readList(Index::new), in.readVLong());
+        this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong());
     }
 
     public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
@@ -151,7 +159,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(name);
-        out.writeString(timeStampField);
+        timeStampField.writeTo(out);
         out.writeList(indices);
         out.writeVLong(generation);
     }
@@ -163,11 +171,11 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
 
     @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
-        args -> new DataStream((String) args[0], (String) args[1], (List<Index>) args[2], (Long) args[3]));
+        args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3]));
 
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
-        PARSER.declareString(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD_FIELD);
+        PARSER.declareObject(ConstructingObjectParser.constructorArg(), TimestampField.PARSER, TIMESTAMP_FIELD_FIELD);
         PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> Index.fromXContent(p), INDICES_FIELD);
         PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
     }
@@ -202,4 +210,111 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
     public int hashCode() {
         return Objects.hash(name, timeStampField, indices, generation);
     }
+
+    public static final class TimestampField implements Writeable, ToXContentObject {
+
+        static ParseField NAME_FIELD = new ParseField("name");
+        static ParseField FIELD_MAPPING_FIELD = new ParseField("mapping");
+
+        @SuppressWarnings("unchecked")
+        private static final ConstructingObjectParser<TimestampField, Void> PARSER = new ConstructingObjectParser<>(
+            "timestamp_field",
+            args -> new TimestampField((String) args[0], (Map<String, Object>) args[1])
+        );
+
+        static {
+            PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
+            PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapOrdered(), FIELD_MAPPING_FIELD);
+        }
+
+        private final String name;
+        private final Map<String, Object> fieldMapping;
+
+        public TimestampField(String name, Map<String, Object> fieldMapping) {
+            assert fieldMapping.containsKey("type") : "no type defined for mapping of timestamp_field";
+            assert ALLOWED_TIMESTAMPFIELD_TYPES.contains(fieldMapping.get("type")) :
+                "invalid type defined for mapping of timestamp_field";
+
+            this.name = name;
+            this.fieldMapping = fieldMapping;
+        }
+
+        public TimestampField(StreamInput in) throws IOException {
+            // TODO: remove bwc logic when backporting:
+            if (in.getVersion().before(Version.V_8_0_0)) {
+                this.name = in.readString();
+                this.fieldMapping = null;
+            } else {
+                this.name = in.readString();
+                this.fieldMapping = in.readMap();
+            }
+        }
+
+        /**
+         * Force fully inserts the timestamp field mapping into the provided mapping.
+         * Existing mapping definitions for the timestamp field will be completely overwritten.
+         * Takes into account if the name of the timestamp field is nested.
+         *
+         * @param mappings The mapping to update
+         */
+        public void insertTimestampFieldMapping(Map<String, Object> mappings) {
+            assert mappings.containsKey("_doc");
+
+            String mappingPath = convertFieldPathToMappingPath(name);
+            String parentObjectFieldPath = "_doc." + mappingPath.substring(0, mappingPath.lastIndexOf('.'));
+            String leafFieldName = mappingPath.substring(mappingPath.lastIndexOf('.') + 1);
+
+            Map<String, Object> changes = new HashMap<>();
+            Map<String, Object> current = changes;
+            for (String key : parentObjectFieldPath.split("\\.")) {
+                Map<String, Object> map = new HashMap<>();
+                current.put(key, map);
+                current = map;
+            }
+            current.put(leafFieldName, fieldMapping);
+            XContentHelper.update(mappings, changes, false);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            // TODO: remove bwc logic when backporting:
+            if (out.getVersion().before(Version.V_8_0_0)) {
+                out.writeString(name);
+            } else {
+                out.writeString(name);
+                out.writeMap(fieldMapping);
+            }
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.field(NAME_FIELD.getPreferredName(), name);
+            builder.field(FIELD_MAPPING_FIELD.getPreferredName(), fieldMapping);
+            builder.endObject();
+            return builder;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public Map<String, Object> getFieldMapping() {
+            return fieldMapping;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            TimestampField that = (TimestampField) o;
+            return name.equals(that.name) &&
+                fieldMapping.equals(that.fieldMapping);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(name, fieldMapping);
+        }
+    }
 }

+ 34 - 6
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

@@ -34,21 +34,24 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ObjectPath;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.util.Arrays;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class MetadataCreateDataStreamService {
 
     private static final Logger logger = LogManager.getLogger(MetadataCreateDataStreamService.class);
-    private static final Set<String> ALLOWED_TIMESTAMPFIELD_TYPES =
+    public static final Set<String> ALLOWED_TIMESTAMPFIELD_TYPES =
         new LinkedHashSet<>(List.of(DateFieldMapper.CONTENT_TYPE, DateFieldMapper.DATE_NANOS_CONTENT_TYPE));
 
     private final ClusterService clusterService;
@@ -121,11 +124,11 @@ public class MetadataCreateDataStreamService {
 
     static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
                                          ClusterState currentState,
-                                         CreateDataStreamClusterStateUpdateRequest request) throws Exception {
-        if (currentState.nodes().getMinNodeVersion().before(Version.V_8_0_0)) {
+                                         CreateDataStreamClusterStateUpdateRequest request)
+        throws Exception {
+if (currentState.nodes().getMinNodeVersion().before(Version.V_8_0_0)) {
             throw new IllegalStateException("data streams require minimum node version of " + Version.V_8_0_0);
         }
-
         if (currentState.metadata().dataStreams().containsKey(request.name)) {
             throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
         }
@@ -150,9 +153,18 @@ public class MetadataCreateDataStreamService {
         currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
         IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName);
         assert firstBackingIndex != null;
+        assert firstBackingIndex.mapping() != null : "no mapping found for backing index [" + firstBackingIndexName + "]";
+
+        String fieldName = template.getDataStreamTemplate().getTimestampField();
+        Map<String, Object> mapping = firstBackingIndex.mapping().getSourceAsMap();
+        Map<String, Object> timeStampFieldMapping = ObjectPath.eval(convertFieldPathToMappingPath(fieldName), mapping);
 
-        Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(
-            new DataStream(request.name, template.getDataStreamTemplate().getTimestampField(), List.of(firstBackingIndex.getIndex())));
+        DataStream.TimestampField timestampField = new DataStream.TimestampField(
+            fieldName,
+            timeStampFieldMapping
+        );
+        DataStream newDataStream = new DataStream(request.name, timestampField, List.of(firstBackingIndex.getIndex()));
+        Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
         logger.info("adding data stream [{}]", request.name);
         return ClusterState.builder(currentState).metadata(builder).build();
     }
@@ -182,4 +194,20 @@ public class MetadataCreateDataStreamService {
         }
     }
 
+    public static String convertFieldPathToMappingPath(String fieldPath) {
+        // The mapping won't allow such fields, so this is a sanity check:
+        assert Arrays.stream(fieldPath.split("\\.")).filter(String::isEmpty).count() == 0L ||
+            fieldPath.startsWith(".") ||
+            fieldPath.endsWith(".") : "illegal field path [" + fieldPath + "]";
+
+        String mappingPath;
+        if (fieldPath.indexOf('.') == -1) {
+            mappingPath = "properties." + fieldPath;
+        } else {
+            mappingPath = "properties." + fieldPath.replace(".", ".properties.");
+        }
+
+        return mappingPath;
+    }
+
 }

+ 7 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

@@ -493,6 +493,13 @@ public class MetadataCreateIndexService {
 
         final Map<String, Object> mappings = resolveV2Mappings(request.mappings(), currentState, templateName, xContentRegistry);
 
+        if (request.dataStreamName() != null) {
+            DataStream dataStream = currentState.metadata().dataStreams().get(request.dataStreamName());
+            if (dataStream != null) {
+                dataStream.getTimeStampField().insertTimestampFieldMapping(mappings);
+            }
+        }
+
         final Settings aggregatedIndexSettings =
             aggregateIndexSettings(currentState, request,
                 MetadataIndexTemplateService.resolveSettings(currentState.metadata(), templateName),

+ 2 - 1
server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java

@@ -44,6 +44,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Matchers.any;
@@ -160,7 +161,7 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas
             }
             allIndices.addAll(backingIndices);
 
-            DataStream ds = new DataStream(dsTuple.v1(), "@timestamp",
+            DataStream ds = new DataStream(dsTuple.v1(), createTimestampField("@timestamp"),
                 backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()), dsTuple.v2());
             builder.put(ds);
         }

+ 5 - 3
server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java

@@ -32,6 +32,7 @@ import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import java.util.List;
 import java.util.Map;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -65,7 +66,8 @@ public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase<
     public void testGetDataStream() {
         final String dataStreamName = "my-data-stream";
         IndexMetadata idx = DataStreamTestHelper.createFirstBackingIndex(dataStreamName).build();
-        DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", List.of(idx.getIndex()));
+        DataStream existingDataStream =
+            new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(idx.getIndex()));
         ClusterState cs = ClusterState.builder(new ClusterName("_name"))
             .metadata(Metadata.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build();
         GetDataStreamAction.Request req = new GetDataStreamAction.Request(dataStreamName);
@@ -79,8 +81,8 @@ public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase<
         IndexMetadata idx1 = DataStreamTestHelper.createFirstBackingIndex(dataStreamNames[0]).build();
         IndexMetadata idx2 = DataStreamTestHelper.createFirstBackingIndex(dataStreamNames[1]).build();
 
-        DataStream ds1 = new DataStream(dataStreamNames[0], "timestamp", List.of(idx1.getIndex()));
-        DataStream ds2 = new DataStream(dataStreamNames[1], "timestamp", List.of(idx2.getIndex()));
+        DataStream ds1 = new DataStream(dataStreamNames[0], createTimestampField("@timestamp"), List.of(idx1.getIndex()));
+        DataStream ds2 = new DataStream(dataStreamNames[1], createTimestampField("@timestamp"), List.of(idx2.getIndex()));
         ClusterState cs = ClusterState.builder(new ClusterName("_name"))
             .metadata(Metadata.builder().dataStreams(
                 Map.of(dataStreamNames[0], ds1, dataStreamNames[1], ds2)).build())

+ 2 - 1
server/src/test/java/org/elasticsearch/action/admin/indices/resolve/ResolveIndexTests.java

@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.core.IsNull.notNullValue;
@@ -238,7 +239,7 @@ public class ResolveIndexTests extends ESTestCase {
             }
             allIndices.addAll(backingIndices);
 
-            DataStream ds = new DataStream(dataStreamName, timestampField,
+            DataStream ds = new DataStream(dataStreamName, createTimestampField(timestampField),
                 backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()));
             builder.put(ds);
         }

+ 1 - 1
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java

@@ -550,7 +550,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
             when(allocationService.reroute(any(ClusterState.class), any(String.class))).then(i -> i.getArguments()[0]);
             DocumentMapper documentMapper = mock(DocumentMapper.class);
             when(documentMapper.type()).thenReturn("_doc");
-            CompressedXContent mapping = new CompressedXContent(generateMapping(dataStream.getTimeStampField()));
+            CompressedXContent mapping = new CompressedXContent(generateMapping(dataStream.getTimeStampField().getName()));
             when(documentMapper.mappingSource()).thenReturn(mapping);
             RoutingFieldMapper routingFieldMapper = mock(RoutingFieldMapper.class);
             when(routingFieldMapper.required()).thenReturn(false);

+ 3 - 1
server/src/test/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommandTests.java

@@ -42,6 +42,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.cluster.DataStreamTestHelper.createFirstBackingIndex;
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
@@ -110,7 +111,8 @@ public class ElasticsearchNodeCommandTests extends ESTestCase {
             for (int i = 0; i < numDataStreams; i++) {
                 String dataStreamName = "name" + 1;
                 IndexMetadata backingIndex = createFirstBackingIndex(dataStreamName).build();
-                mdBuilder.put(new DataStream(dataStreamName, "ts", List.of(backingIndex.getIndex())));
+                mdBuilder.put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
+                    List.of(backingIndex.getIndex())));
             }
         }
         mdBuilder.indexGraveyard(graveyard.build());

+ 72 - 5
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.cluster.metadata;
 
+import org.elasticsearch.cluster.metadata.DataStream.TimestampField;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.XContentParser;
@@ -26,9 +27,12 @@ import org.elasticsearch.test.AbstractSerializingTestCase;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -48,7 +52,7 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
         long generation = indices.size() + randomLongBetween(1, 128);
         String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
         indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
-        return new DataStream(dataStreamName, randomAlphaOfLength(10), indices, generation);
+        return new DataStream(dataStreamName, createTimestampField(randomAlphaOfLength(10)), indices, generation);
     }
 
     @Override
@@ -88,7 +92,7 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
         for (int k = 1; k <= numBackingIndices; k++) {
             indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k), UUIDs.randomBase64UUID(random())));
         }
-        DataStream original = new DataStream(dataStreamName, "@timestamp", indices);
+        DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
         DataStream updated = original.removeBackingIndex(indices.get(indexToRemove - 1));
         assertThat(updated.getName(), equalTo(original.getName()));
         assertThat(updated.getGeneration(), equalTo(original.getGeneration()));
@@ -118,7 +122,7 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
         for (int i = 1; i <= numBackingIndices; i++) {
             indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, i), UUIDs.randomBase64UUID(random())));
         }
-        DataStream original = new DataStream(dataStreamName, "@timestamp", indices);
+        DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
 
         Index newBackingIndex = new Index("replacement-index", UUIDs.randomBase64UUID(random()));
         DataStream updated = original.replaceBackingIndex(indices.get(indexToReplace), newBackingIndex);
@@ -143,7 +147,7 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
         for (int i = 1; i <= numBackingIndices; i++) {
             indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, i), UUIDs.randomBase64UUID(random())));
         }
-        DataStream original = new DataStream(dataStreamName, "@timestamp", indices);
+        DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
 
         Index standaloneIndex = new Index("index-foo", UUIDs.randomBase64UUID(random()));
         Index newBackingIndex = new Index("replacement-index", UUIDs.randomBase64UUID(random()));
@@ -159,9 +163,72 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
         for (int i = 1; i <= numBackingIndices; i++) {
             indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, i), UUIDs.randomBase64UUID(random())));
         }
-        DataStream original = new DataStream(dataStreamName, "@timestamp", indices);
+        DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
 
         Index newBackingIndex = new Index("replacement-index", UUIDs.randomBase64UUID(random()));
         expectThrows(IllegalArgumentException.class, () -> original.replaceBackingIndex(indices.get(writeIndexPosition), newBackingIndex));
     }
+
+    public void testInsertTimestampFieldMapping() {
+        TimestampField timestampField = new TimestampField("@timestamp", Map.of("type", "date", "meta", Map.of("x", "y")));
+
+        Map<String, Object> mappings = Map.of("_doc", Map.of("properties", new HashMap<>(Map.of("my_field", Map.of("type", "keyword")))));
+        timestampField.insertTimestampFieldMapping(mappings);
+        Map<String, Object> expectedMapping = Map.of("_doc", Map.of("properties", Map.of("my_field", Map.of("type", "keyword"),
+            "@timestamp", Map.of("type", "date", "meta", Map.of("x", "y")))));
+        assertThat(mappings, equalTo(expectedMapping));
+
+        // ensure that existing @timestamp definitions get overwritten:
+        mappings = Map.of("_doc", Map.of("properties", new HashMap<>(Map.of("my_field", Map.of("type", "keyword"),
+            "@timestamp", new HashMap<>(Map.of("type", "keyword")) ))));
+        timestampField.insertTimestampFieldMapping(mappings);
+        expectedMapping = Map.of("_doc", Map.of("properties", Map.of("my_field", Map.of("type", "keyword"), "@timestamp",
+            Map.of("type", "date", "meta", Map.of("x", "y")))));
+        assertThat(mappings, equalTo(expectedMapping));
+    }
+
+    public void testInsertNestedTimestampFieldMapping() {
+        TimestampField timestampField = new TimestampField("event.attr.@timestamp", Map.of("type", "date", "meta", Map.of("x", "y")));
+
+        Map<String, Object> mappings = Map.of("_doc", Map.of("properties", Map.of("event", Map.of("properties", Map.of("attr",
+            Map.of("properties", new HashMap<>(Map.of("my_field", Map.of("type", "keyword")))))))));
+        timestampField.insertTimestampFieldMapping(mappings);
+        Map<String, Object> expectedMapping = Map.of("_doc", Map.of("properties", Map.of("event", Map.of("properties", Map.of("attr",
+            Map.of("properties", new HashMap<>(Map.of("my_field", Map.of("type", "keyword"),
+                "@timestamp", Map.of("type", "date", "meta", Map.of("x", "y"))))))))));
+        assertThat(mappings, equalTo(expectedMapping));
+
+        // ensure that existing @timestamp definitions get overwritten:
+        mappings = Map.of("_doc", Map.of("properties", Map.of("event", Map.of("properties", Map.of("attr",
+            Map.of("properties", new HashMap<>(Map.of("my_field", Map.of("type", "keyword"),
+                "@timestamp", new HashMap<>(Map.of("type", "keyword")) ))))))));
+        timestampField.insertTimestampFieldMapping(mappings);
+        expectedMapping = Map.of("_doc", Map.of("properties", Map.of("event", Map.of("properties", Map.of("attr",
+            Map.of("properties", new HashMap<>(Map.of("my_field", Map.of("type", "keyword"),
+                "@timestamp", Map.of("type", "date", "meta", Map.of("x", "y"))))))))));
+        assertThat(mappings, equalTo(expectedMapping));
+
+        // no event and attr parent objects
+        mappings = Map.of("_doc", Map.of("properties", new HashMap<>()));
+        timestampField.insertTimestampFieldMapping(mappings);
+        expectedMapping = Map.of("_doc", Map.of("properties", Map.of("event", Map.of("properties", Map.of("attr",
+            Map.of("properties", new HashMap<>(Map.of("@timestamp", Map.of("type", "date", "meta", Map.of("x", "y"))))))))));
+        assertThat(mappings, equalTo(expectedMapping));
+
+        // no attr parent object
+        mappings = Map.of("_doc", Map.of("properties", Map.of("event", Map.of("properties", new HashMap<>()))));
+        timestampField.insertTimestampFieldMapping(mappings);
+        expectedMapping = Map.of("_doc", Map.of("properties", Map.of("event", Map.of("properties", Map.of("attr",
+            Map.of("properties", new HashMap<>(Map.of("@timestamp", Map.of("type", "date", "meta", Map.of("x", "y"))))))))));
+        assertThat(mappings, equalTo(expectedMapping));
+
+        // Empty attr parent object
+        mappings = Map.of("_doc", Map.of("properties", Map.of("event", Map.of("properties",
+            Map.of("attr", Map.of("properties", new HashMap<>()))))));
+        timestampField.insertTimestampFieldMapping(mappings);
+        expectedMapping = Map.of("_doc", Map.of("properties", Map.of("event", Map.of("properties", Map.of("attr",
+            Map.of("properties", new HashMap<>(Map.of("@timestamp", Map.of("type", "date", "meta", Map.of("x", "y"))))))))));
+        assertThat(mappings, equalTo(expectedMapping));
+    }
+
 }

+ 9 - 7
server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java

@@ -50,6 +50,7 @@ import java.util.Set;
 import java.util.function.Function;
 
 import static org.elasticsearch.cluster.DataStreamTestHelper.createBackingIndex;
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING;
 import static org.elasticsearch.common.util.set.Sets.newHashSet;
 import static org.hamcrest.Matchers.arrayContaining;
@@ -1694,7 +1695,7 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
 
         Metadata.Builder mdBuilder = Metadata.builder()
             .put(backingIndex, false)
-            .put(new DataStream(dataStreamName, "ts", List.of(backingIndex.getIndex()), 1));
+            .put(new DataStream(dataStreamName, createTimestampField("ts"), List.of(backingIndex.getIndex()), 1));
         ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
 
         {
@@ -1804,7 +1805,7 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
         Metadata.Builder mdBuilder = Metadata.builder()
             .put(index1, false)
             .put(index2, false)
-            .put(new DataStream(dataStreamName, "ts", List.of(index1.getIndex(), index2.getIndex()), 2));
+            .put(new DataStream(dataStreamName, createTimestampField("ts"), List.of(index1.getIndex(), index2.getIndex()), 2));
         ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
 
         {
@@ -1886,8 +1887,8 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
             .put(index2, false)
             .put(index3, false)
             .put(index4, false)
-            .put(new DataStream(dataStream1, "ts", List.of(index1.getIndex(), index2.getIndex())))
-            .put(new DataStream(dataStream2, "ts", List.of(index3.getIndex(), index4.getIndex())));
+            .put(new DataStream(dataStream1, createTimestampField("@timestamp"), List.of(index1.getIndex(), index2.getIndex())))
+            .put(new DataStream(dataStream2, createTimestampField("@timestamp"), List.of(index3.getIndex(), index4.getIndex())));
 
         ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
         {
@@ -1933,7 +1934,8 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
                 .put(index1, false)
                 .put(index2, false)
                 .put(justAnIndex, false)
-                .put(new DataStream(dataStream1, "ts", List.of(index1.getIndex(), index2.getIndex())))).build();
+                .put(new DataStream(dataStream1, createTimestampField("@timestamp"),
+                    List.of(index1.getIndex(), index2.getIndex())))).build();
 
         IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled();
         Index[] result = indexNameExpressionResolver.concreteIndices(state, indicesOptions, true, "logs-*");
@@ -1966,8 +1968,8 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
                 .put(index3, false)
                 .put(index4, false)
                 .put(justAnIndex, false)
-                .put(new DataStream(dataStream1, "ts", List.of(index1.getIndex(), index2.getIndex())))
-                .put(new DataStream(dataStream2, "ts", List.of(index3.getIndex(), index4.getIndex())))).build();
+                .put(new DataStream(dataStream1, createTimestampField("ts"), List.of(index1.getIndex(), index2.getIndex())))
+                .put(new DataStream(dataStream2, createTimestampField("ts"), List.of(index3.getIndex(), index4.getIndex())))).build();
 
         List<String> names = indexNameExpressionResolver.dataStreamNames(state, IndicesOptions.lenientExpand(), "log*");
         assertEquals(Collections.singletonList(dataStream1), names);

+ 3 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java

@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.elasticsearch.cluster.DataStreamTestHelper.createFirstBackingIndex;
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.validateTimestampFieldMapping;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -67,7 +68,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
         final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
         final String dataStreamName = "my-data-stream";
         IndexMetadata idx = createFirstBackingIndex(dataStreamName).build();
-        DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", List.of(idx.getIndex()));
+        DataStream existingDataStream =
+            new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(idx.getIndex()));
         ClusterState cs = ClusterState.builder(new ClusterName("_name"))
             .metadata(Metadata.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build();
         CreateDataStreamClusterStateUpdateRequest req =

+ 2 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesServiceTests.java

@@ -37,6 +37,7 @@ import java.util.HashSet;
 import java.util.List;
 
 import static java.util.Collections.singletonList;
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -497,7 +498,7 @@ public class MetadataIndexAliasesServiceTests extends ESTestCase {
             .metadata(
                 Metadata.builder()
                     .put(indexMetadata, true)
-                    .put(new DataStream(dataStreamName, "@timestamp", singletonList(indexMetadata.getIndex()))))
+                    .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), singletonList(indexMetadata.getIndex()))))
             .build();
 
         IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> service.applyAliasActions(state,

+ 10 - 9
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java

@@ -57,6 +57,7 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.DataStreamTestHelper.createBackingIndex;
 import static org.elasticsearch.cluster.DataStreamTestHelper.createFirstBackingIndex;
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.cluster.metadata.Metadata.Builder.validateDataStreams;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -944,7 +945,7 @@ public class MetadataTests extends ESTestCase {
                 .numberOfShards(1)
                 .numberOfReplicas(1)
                 .build(), false)
-            .put(new DataStream(dataStreamName, "ts", List.of(idx.getIndex())));
+            .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(idx.getIndex())));
 
         IllegalStateException e = expectThrows(IllegalStateException.class, b::build);
         assertThat(e.getMessage(),
@@ -959,7 +960,7 @@ public class MetadataTests extends ESTestCase {
             .build();
         Metadata.Builder b = Metadata.builder()
             .put(idx, false)
-            .put(new DataStream(dataStreamName, "ts", List.of(idx.getIndex())));
+            .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(idx.getIndex())));
 
         IllegalStateException e = expectThrows(IllegalStateException.class, b::build);
         assertThat(e.getMessage(),
@@ -976,7 +977,7 @@ public class MetadataTests extends ESTestCase {
         Metadata.Builder b = Metadata.builder()
             .put(validIdx, false)
             .put(invalidIdx, false)
-            .put(new DataStream(dataStreamName, "ts", List.of(validIdx.getIndex())));
+            .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(validIdx.getIndex())));
 
         IllegalStateException e = expectThrows(IllegalStateException.class, b::build);
         assertThat(e.getMessage(), containsString("data stream [" + dataStreamName +
@@ -991,7 +992,7 @@ public class MetadataTests extends ESTestCase {
             .build();
         Metadata.Builder b = Metadata.builder()
             .put(idx, false)
-            .put(new DataStream(dataStreamName, "ts", List.of(idx.getIndex())));
+            .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(idx.getIndex())));
 
         IllegalStateException e = expectThrows(IllegalStateException.class, b::build);
         assertThat(e.getMessage(), containsString("data stream [" + dataStreamName +
@@ -1015,7 +1016,7 @@ public class MetadataTests extends ESTestCase {
             backingIndices.add(im.getIndex());
         }
 
-        b.put(new DataStream(dataStreamName, "ts", backingIndices, lastBackingIndexNum));
+        b.put(new DataStream(dataStreamName, createTimestampField("ts"), backingIndices, lastBackingIndexNum));
         Metadata metadata = b.build();
         assertThat(metadata.dataStreams().size(), equalTo(1));
         assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
@@ -1033,7 +1034,7 @@ public class MetadataTests extends ESTestCase {
                 indices.add(idx.getIndex());
                 b.put(idx, true);
             }
-            b.put(new DataStream(name, "ts", indices, indices.size()));
+            b.put(new DataStream(name, createTimestampField("ts"), indices, indices.size()));
         }
 
         Metadata metadata = b.build();
@@ -1098,7 +1099,7 @@ public class MetadataTests extends ESTestCase {
         }
         DataStream dataStream = new DataStream(
             dataStreamName,
-            "ts",
+            createTimestampField("ts"),
             backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
             backingIndices.size()
         );
@@ -1172,7 +1173,7 @@ public class MetadataTests extends ESTestCase {
         }
         DataStream dataStream = new DataStream(
             dataStreamName,
-            "ts",
+            createTimestampField("ts"),
             backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
             backingIndices.size()
         );
@@ -1274,7 +1275,7 @@ public class MetadataTests extends ESTestCase {
             b.put(im, false);
             backingIndices.add(im.getIndex());
         }
-        b.put(new DataStream(dataStreamName, "ts", backingIndices, lastBackingIndexNum));
+        b.put(new DataStream(dataStreamName, createTimestampField("ts"), backingIndices, lastBackingIndexNum));
         return new CreateIndexResult(indices, backingIndices, b.build());
     }
 

+ 5 - 4
server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java

@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.elasticsearch.cluster.DataStreamTestHelper.createFirstBackingIndex;
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.cluster.metadata.AliasMetadata.newAliasMetadataBuilder;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
 import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_API;
@@ -101,8 +102,8 @@ public class ToAndFromJsonMetadataTests extends ESTestCase {
                         .putAlias(newAliasMetadataBuilder("alias-bar3").routing("routing-bar")))
                 .put(idx1, false)
                 .put(idx2, false)
-                .put(new DataStream("data-stream1", "@timestamp", List.of(idx1.getIndex())))
-                .put(new DataStream("data-stream2", "@timestamp2", List.of(idx2.getIndex())))
+                .put(new DataStream("data-stream1", createTimestampField("@timestamp"), List.of(idx1.getIndex())))
+                .put(new DataStream("data-stream2", createTimestampField("@timestamp2"), List.of(idx2.getIndex())))
                 .build();
 
         XContentBuilder builder = JsonXContent.contentBuilder();
@@ -152,11 +153,11 @@ public class ToAndFromJsonMetadataTests extends ESTestCase {
         // data streams
         assertNotNull(parsedMetadata.dataStreams().get("data-stream1"));
         assertThat(parsedMetadata.dataStreams().get("data-stream1").getName(), is("data-stream1"));
-        assertThat(parsedMetadata.dataStreams().get("data-stream1").getTimeStampField(), is("@timestamp"));
+        assertThat(parsedMetadata.dataStreams().get("data-stream1").getTimeStampField().getName(), is("@timestamp"));
         assertThat(parsedMetadata.dataStreams().get("data-stream1").getIndices(), contains(idx1.getIndex()));
         assertNotNull(parsedMetadata.dataStreams().get("data-stream2"));
         assertThat(parsedMetadata.dataStreams().get("data-stream2").getName(), is("data-stream2"));
-        assertThat(parsedMetadata.dataStreams().get("data-stream2").getTimeStampField(), is("@timestamp2"));
+        assertThat(parsedMetadata.dataStreams().get("data-stream2").getTimeStampField().getName(), is("@timestamp2"));
         assertThat(parsedMetadata.dataStreams().get("data-stream2").getIndices(), contains(idx2.getIndex()));
     }
 

+ 4 - 3
server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.test.ESTestCase;
 import java.util.Collections;
 import java.util.List;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -40,7 +41,7 @@ public class RestoreServiceTests extends ESTestCase {
         String backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
         List<Index> indices = Collections.singletonList(new Index(backingIndexName, "uuid"));
 
-        DataStream dataStream = new DataStream(dataStreamName, "@timestamp", indices);
+        DataStream dataStream = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
 
         Metadata.Builder metadata = mock(Metadata.Builder.class);
         IndexMetadata indexMetadata = mock(IndexMetadata.class);
@@ -63,7 +64,7 @@ public class RestoreServiceTests extends ESTestCase {
         String renamedBackingIndexName = DataStream.getDefaultBackingIndexName(renamedDataStreamName, 1);
         List<Index> indices = Collections.singletonList(new Index(backingIndexName, "uuid"));
 
-        DataStream dataStream = new DataStream(dataStreamName, "@timestamp", indices);
+        DataStream dataStream = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
 
         Metadata.Builder metadata = mock(Metadata.Builder.class);
         IndexMetadata indexMetadata = mock(IndexMetadata.class);
@@ -86,7 +87,7 @@ public class RestoreServiceTests extends ESTestCase {
         String renamedBackingIndexName = DataStream.getDefaultBackingIndexName(renamedDataStreamName, 1);
         List<Index> indices = Collections.singletonList(new Index(backingIndexName, "uuid"));
 
-        DataStream dataStream = new DataStream(dataStreamName, "@timestamp", indices);
+        DataStream dataStream = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
 
         Metadata.Builder metadata = mock(Metadata.Builder.class);
         IndexMetadata indexMetadata = mock(IndexMetadata.class);

+ 6 - 0
test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java

@@ -26,6 +26,8 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.test.ESTestCase;
 
+import java.util.Map;
+
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
 
 public final class DataStreamTestHelper {
@@ -51,4 +53,8 @@ public final class DataStreamTestHelper {
             .numberOfShards(NUMBER_OF_SHARDS)
             .numberOfReplicas(NUMBER_OF_REPLICAS);
     }
+
+    public static DataStream.TimestampField createTimestampField(String fieldName) {
+        return new DataStream.TimestampField(fieldName, Map.of("type", "date"));
+    }
 }

+ 4 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckNoDataStreamWriteIndexStepTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.index.Index;
 
 import java.util.List;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.xpack.core.ilm.AbstractStepMasterTimeoutTestCase.emptyClusterState;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
@@ -73,8 +74,8 @@ public class CheckNoDataStreamWriteIndexStepTests extends AbstractStepTestCase<C
                 .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
 
         ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata(
-            Metadata.builder().put(indexMetadata, true).put(new DataStream(dataStreamName, "timestamp",
-                List.of(indexMetadata.getIndex()))).build()
+            Metadata.builder().put(indexMetadata, true).put(new DataStream(dataStreamName,
+                createTimestampField("@timestamp"), List.of(indexMetadata.getIndex()))).build()
         ).build();
 
         ClusterStateWaitStep.Result result = createRandomInstance().isConditionMet(indexMetadata.getIndex(), clusterState);
@@ -105,7 +106,7 @@ public class CheckNoDataStreamWriteIndexStepTests extends AbstractStepTestCase<C
             Metadata.builder()
                 .put(indexMetadata, true)
                 .put(writeIndexMetadata, true)
-                .put(new DataStream(dataStreamName, "timestamp", backingIndices))
+                .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices))
                 .build()
         ).build();
 

+ 3 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DeleteStepTests.java

@@ -20,6 +20,7 @@ import org.mockito.Mockito;
 
 import java.util.List;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 
@@ -150,7 +151,8 @@ public class DeleteStepTests extends AbstractStepMasterTimeoutTestCase<DeleteSte
             IndexMetadata.builder(indexName).settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
                 .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
 
-        DataStream dataStream = new DataStream(dataStreamName, "timestamp", List.of(sourceIndexMetadata.getIndex()));
+        DataStream dataStream =
+            new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(sourceIndexMetadata.getIndex()));
         ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata(
             Metadata.builder().put(sourceIndexMetadata, true).put(dataStream).build()
         ).build();

+ 6 - 4
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ReplaceDataStreamBackingIndexStepTests.java

@@ -16,6 +16,7 @@ import org.elasticsearch.index.Index;
 import java.util.List;
 import java.util.UUID;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.xpack.core.ilm.AbstractStepMasterTimeoutTestCase.emptyClusterState;
 import static org.hamcrest.Matchers.is;
 
@@ -77,8 +78,9 @@ public class ReplaceDataStreamBackingIndexStepTests extends AbstractStepTestCase
                 .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
 
         ClusterState clusterState = ClusterState.builder(emptyClusterState()).metadata(
-            Metadata.builder().put(sourceIndexMetadata, true).put(new DataStream(dataStreamName, "timestamp",
-                List.of(sourceIndexMetadata.getIndex()))).build()
+            Metadata.builder().put(sourceIndexMetadata, true)
+                .put(new DataStream(dataStreamName, createTimestampField("timestamp"),
+                    List.of(sourceIndexMetadata.getIndex()))).build()
         ).build();
 
         expectThrows(IllegalStateException.class,
@@ -105,7 +107,7 @@ public class ReplaceDataStreamBackingIndexStepTests extends AbstractStepTestCase
             Metadata.builder()
                 .put(sourceIndexMetadata, true)
                 .put(writeIndexMetadata, true)
-                .put(new DataStream(dataStreamName, "timestamp", backingIndices))
+                .put(new DataStream(dataStreamName, createTimestampField("timestamp"), backingIndices))
                 .build()
         ).build();
 
@@ -147,7 +149,7 @@ public class ReplaceDataStreamBackingIndexStepTests extends AbstractStepTestCase
             Metadata.builder()
                 .put(sourceIndexMetadata, true)
                 .put(writeIndexMetadata, true)
-                .put(new DataStream(dataStreamName, "timestamp", backingIndices))
+                .put(new DataStream(dataStreamName, createTimestampField("timestamp"), backingIndices))
                 .put(targetIndexMetadata, true)
                 .build()
         ).build();

+ 3 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java

@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.core.Is.is;
 
@@ -134,7 +135,8 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
         ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
             .metadata(
                 Metadata.builder()
-                    .put(new DataStream(dataStreamName, "timestamp", List.of(indexMetadata.getIndex()), 1L))
+                    .put(new DataStream(dataStreamName, createTimestampField("timestamp"),
+                        List.of(indexMetadata.getIndex()), 1L))
                     .put(indexMetadata, true)
             )
             .build();

+ 3 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStepTests.java

@@ -20,6 +20,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.function.LongSupplier;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.hamcrest.Matchers.equalTo;
 
 public class UpdateRolloverLifecycleDateStepTests extends AbstractStepTestCase<UpdateRolloverLifecycleDateStep> {
@@ -96,7 +97,8 @@ public class UpdateRolloverLifecycleDateStepTests extends AbstractStepTestCase<U
         ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
             .metadata(
                 Metadata.builder()
-                    .put(new DataStream(dataStreamName, "timestamp", List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L))
+                    .put(new DataStream(dataStreamName, createTimestampField("timestamp"),
+                        List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L))
                     .put(originalIndexMeta, true)
                     .put(rolledIndexMeta, true)
             ).build();

+ 3 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java

@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.is;
 
@@ -167,7 +168,8 @@ public class WaitForActiveShardsTests extends AbstractStepTestCase<WaitForActive
         ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
             .metadata(
                 Metadata.builder()
-                    .put(new DataStream(dataStreamName, "timestamp", List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L))
+                    .put(new DataStream(dataStreamName, createTimestampField("timestamp"),
+                        List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L))
                     .put(originalIndexMeta, true)
                     .put(rolledIndexMeta, true)
             )

+ 3 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java

@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.times;
@@ -150,7 +151,8 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
 
         SetOnce<Boolean> conditionsMet = new SetOnce<>();
         Metadata metadata = Metadata.builder().put(indexMetadata, true)
-            .put(new DataStream(dataStreamName, "timestamp", List.of(indexMetadata.getIndex()), 1L))
+            .put(new DataStream(dataStreamName, createTimestampField("timestamp"),
+                List.of(indexMetadata.getIndex()), 1L))
             .build();
         step.evaluateCondition(metadata, indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
 

+ 2 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java

@@ -44,6 +44,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
 import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask;
@@ -494,7 +495,7 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
 
         clusterState = ClusterState.builder(new ClusterName("cluster_name"))
             .metadata(new Metadata.Builder()
-                .put(new DataStream(dataStreamName, "@timestamp", Collections.singletonList(index), 1L))
+                .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), Collections.singletonList(index), 1L))
                 .putCustom(PersistentTasksCustomMetadata.TYPE, tasks)
                 .putCustom(MlMetadata.TYPE, mlMetadata)
                 .put(indexMetadata, false))

+ 5 - 2
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolverTests.java

@@ -79,6 +79,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
 import static org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames.SECURITY_MAIN_ALIAS;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
@@ -165,8 +166,10 @@ public class IndicesAndAliasesResolverTests extends ESTestCase {
                 .put(dataStreamIndex1, true)
                 .put(dataStreamIndex2, true)
                 .put(dataStreamIndex3, true)
-                .put(new DataStream(dataStreamName, "ts", List.of(dataStreamIndex1.getIndex(), dataStreamIndex2.getIndex())))
-                .put(new DataStream(otherDataStreamName, "ts", List.of(dataStreamIndex3.getIndex())))
+                .put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
+                    List.of(dataStreamIndex1.getIndex(), dataStreamIndex2.getIndex())))
+                .put(new DataStream(otherDataStreamName, createTimestampField("@timestamp"),
+                    List.of(dataStreamIndex3.getIndex())))
                 .put(indexBuilder(securityIndexName).settings(settings)).build();
 
         if (withAlias) {