Browse Source

Add data stream timestamp validation via metadata field mapper (#58582)

This commit adds a new metadata field mapper that validates,
that a document has exactly a single timestamp value in the data stream timestamp field and
that the timestamp field mapping only has `type`, `meta` or `format` attributes configured.
Other attributes can affect the guarantee that an index with this meta field mapper has a 
useable timestamp field.

The MetadataCreateIndexService inserts a data stream timestamp field mapper whenever
a new backing index of a data stream is created.

Relates to #53100
Martijn van Groningen 5 years ago
parent
commit
001b3fb440
24 changed files with 662 additions and 79 deletions
  1. 2 2
      docs/reference/indices/rollover-index.asciidoc
  2. 3 1
      modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/90_data_streams.yml
  3. 9 3
      modules/reindex/src/test/resources/rest-api-spec/test/reindex/96_data_streams.yml
  4. 3 3
      modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/90_data_streams.yml
  5. 63 6
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml
  6. 3 1
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/20_unsupported_apis.yml
  7. 3 1
      rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/30_auto_create_data_stream.yml
  8. 14 14
      server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java
  9. 63 26
      server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java
  10. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
  11. 12 0
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java
  12. 8 0
      server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java
  13. 14 1
      server/src/main/java/org/elasticsearch/index/mapper/MapperMergeValidator.java
  14. 1 1
      server/src/main/java/org/elasticsearch/index/mapper/MapperService.java
  15. 263 0
      server/src/main/java/org/elasticsearch/index/mapper/TimestampFieldMapper.java
  16. 2 0
      server/src/main/java/org/elasticsearch/indices/IndicesModule.java
  17. 12 4
      server/src/test/java/org/elasticsearch/index/mapper/MapperMergeValidatorTests.java
  18. 5 9
      server/src/test/java/org/elasticsearch/index/mapper/SourceFieldMapperTests.java
  19. 167 0
      server/src/test/java/org/elasticsearch/index/mapper/TimestampFieldMapperTests.java
  20. 3 1
      server/src/test/java/org/elasticsearch/indices/IndicesModuleTests.java
  21. 1 1
      x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java
  22. 2 0
      x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java
  23. 3 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java
  24. 5 3
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_data_stream_resolvability.yml

+ 2 - 2
docs/reference/indices/rollover-index.asciidoc

@@ -233,14 +233,14 @@ PUT _index_template/template
   "template": {
     "mappings": {
       "properties": {
-        "@timestamp": {
+        "date": {
           "type": "date"
         }
       }
     }
   },
   "data_stream": {
-    "timestamp_field": "@timestamp"
+    "timestamp_field": "date"
   }
 }
 -----------------------------------

+ 3 - 1
modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/90_data_streams.yml

@@ -30,7 +30,9 @@
         index:   simple-data-stream1
         id:      1
         op_type: create
-        body:    { "text": "test" }
+        body:
+          foo: bar
+          '@timestamp': '2020-12-12'
 
   - do:
       indices.refresh:

+ 9 - 3
modules/reindex/src/test/resources/rest-api-spec/test/reindex/96_data_streams.yml

@@ -34,7 +34,9 @@ teardown:
       index:
         index:  logs-foobar
         refresh: true
-        body:   { foo: bar }
+        body:
+          foo: bar
+          timestamp: '2020-12-12'
 
   - do:
       reindex:
@@ -65,7 +67,9 @@ teardown:
       index:
         index: old-logs-index
         refresh: true
-        body:   { foo: bar }
+        body:
+          foo: bar
+          timestamp: '2020-12-12'
 
   - do:
       reindex:
@@ -96,7 +100,9 @@ teardown:
       index:
         index:  logs-foobar
         refresh: true
-        body:   { foo: bar }
+        body:
+          foo: bar
+          timestamp: '2020-12-12'
 
   - do:
       reindex:

+ 3 - 3
modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/90_data_streams.yml

@@ -30,7 +30,7 @@
         index:   simple-data-stream1
         id:      1
         op_type: create
-        body:    { "number": 4 }
+        body:    { "number": 4, '@timestamp': '2020-12-12' }
 
   # rollover data stream to create new backing index
   - do:
@@ -47,7 +47,7 @@
         index:   simple-data-stream1
         id:      2
         op_type: create
-        body:    { "number": 1 }
+        body:    { "number": 1, '@timestamp': '2020-12-12' }
 
   # rollover data stream to create another new backing index
   - do:
@@ -64,7 +64,7 @@
         index:   simple-data-stream1
         id:      3
         op_type: create
-        body:    { "number": 5 }
+        body:    { "number": 5, '@timestamp': '2020-12-12' }
 
   - do:
       indices.refresh:

+ 63 - 6
rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml

@@ -63,7 +63,9 @@ setup:
   - do:
       index:
         index:  simple-data-stream1
-        body:   { foo: bar }
+        body:
+          '@timestamp': '2020-12-12'
+          foo: bar
 
   - do:
       indices.refresh:
@@ -241,27 +243,27 @@ setup:
   - do:
       index:
         index:  logs-foobar
-        body:   { foo: bar }
+        body:   { timestamp: '2020-12-12' }
   - match: { _index: .ds-logs-foobar-000001 }
 
   - do:
       catch: bad_request
       index:
         index:  .ds-logs-foobar-000001
-        body:   { foo: bar }
+        body:   { timestamp: '2020-12-12' }
 
   - do:
       bulk:
         body:
           - create:
               _index: .ds-logs-foobar-000001
-          - foo: bar
+          - timestamp: '2020-12-12'
           - index:
               _index: .ds-logs-foobar-000001
-          - foo: bar
+          - timestamp: '2020-12-12'
           - create:
               _index: logs-foobar
-          - foo: bar
+          - timestamp: '2020-12-12'
   - match: { errors: true }
   - match: { items.0.create.status: 400 }
   - match: { items.0.create.error.type: illegal_argument_exception }
@@ -276,3 +278,58 @@ setup:
       indices.delete_data_stream:
         name: logs-foobar
   - is_true: acknowledged
+
+---
+"Indexing a document into a data stream without a timestamp field":
+  - skip:
+      version: " - 7.9.99"
+      reason: "enable in 7.9+ when backported"
+      features: allowed_warnings
+
+  - do:
+      allowed_warnings:
+        - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: generic_logs_template
+        body:
+          index_patterns: logs-*
+          template:
+            mappings:
+              properties:
+                'timestamp':
+                  type: date
+          data_stream:
+            timestamp_field: timestamp
+
+  - do:
+      catch: bad_request
+      index:
+        index:  logs-foobar
+        body:   { foo: bar }
+
+  - do:
+      bulk:
+        body:
+          - create:
+              _index: logs-foobar
+          - foo: bar
+          - create:
+              _index: logs-foobar
+          - timestamp: '2020-12-12'
+          - create:
+              _index: logs-foobar
+          - timestamp: ['2020-12-12', '2022-12-12']
+  - match: { errors: true }
+  - match: { items.0.create.status: 400 }
+  - match: { items.0.create.error.caused_by.type: illegal_argument_exception }
+  - match: { items.0.create.error.caused_by.reason: "data stream timestamp field [timestamp] is missing" }
+  - match: { items.1.create.result: created }
+  - match: { items.1.create._index: .ds-logs-foobar-000001 }
+  - match: { items.2.create.status: 400 }
+  - match: { items.2.create.error.caused_by.type: illegal_argument_exception }
+  - match: { items.2.create.error.caused_by.reason: "data stream timestamp field [timestamp] encountered multiple values" }
+
+  - do:
+      indices.delete_data_stream:
+        name: logs-foobar
+  - is_true: acknowledged

+ 3 - 1
rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/20_unsupported_apis.yml

@@ -29,7 +29,9 @@
       index:
         index:  logs-foobar
         refresh: true
-        body:   { foo: bar }
+        body:
+          '@timestamp': '2020-12-12'
+          foo: bar
   - match: {_index: .ds-logs-foobar-000001}
 
   - do:

+ 3 - 1
rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/30_auto_create_data_stream.yml

@@ -27,7 +27,9 @@
       index:
         index:  logs-foobar
         refresh: true
-        body:   { foo: bar }
+        body:
+          'timestamp': '2020-12-12'
+          foo: bar
 
   - do:
       search:

+ 14 - 14
server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java

@@ -233,28 +233,28 @@ public class BulkIntegrationIT extends ESIntegTestCase {
         client().execute(PutComposableIndexTemplateAction.INSTANCE, createTemplateRequest).actionGet();
 
         BulkRequest bulkRequest = new BulkRequest();
-        bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
-        bulkRequest.add(new IndexRequest("logs-foobaz").opType(CREATE).source("{}", XContentType.JSON));
-        bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
-        bulkRequest.add(new IndexRequest("logs-barfoo").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-foobaz").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barfoo").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
         BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
         assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));
 
         bulkRequest = new BulkRequest();
-        bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
-        bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{}", XContentType.JSON));
-        bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
-        bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
         bulkResponse = client().bulk(bulkRequest).actionGet();
         assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));
 
         bulkRequest = new BulkRequest();
-        bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
-        bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{}", XContentType.JSON));
-        bulkRequest.add(new IndexRequest("logs-foobaz3").opType(CREATE).source("{}", XContentType.JSON));
-        bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
-        bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{}", XContentType.JSON));
-        bulkRequest.add(new IndexRequest("logs-barfoo3").opType(CREATE).source("{}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-foobaz3").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
+        bulkRequest.add(new IndexRequest("logs-barfoo3").opType(CREATE).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON));
         bulkResponse = client().bulk(bulkRequest).actionGet();
         assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));
 

+ 63 - 26
server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java

@@ -57,6 +57,8 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.ObjectPath;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.mapper.MapperParsingException;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.junit.After;
@@ -66,8 +68,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
+import java.util.Locale;
 import java.util.Optional;
+import java.util.Map;
 
 import static org.elasticsearch.indices.IndicesOptionsIntegrationIT._flush;
 import static org.elasticsearch.indices.IndicesOptionsIntegrationIT.clearCache;
@@ -144,9 +147,9 @@ public class DataStreamIT extends ESIntegTestCase {
         assertThat(ObjectPath.eval("properties.@timestamp1.type", mappings), is("date"));
 
         int numDocsBar = randomIntBetween(2, 16);
-        indexDocs("metrics-bar", numDocsBar);
+        indexDocs("metrics-bar", "@timestamp2", numDocsBar);
         int numDocsFoo = randomIntBetween(2, 16);
-        indexDocs("metrics-foo", numDocsFoo);
+        indexDocs("metrics-foo", "@timestamp1", numDocsFoo);
 
         verifyDocs("metrics-bar", numDocsBar, 1, 1);
         verifyDocs("metrics-foo", numDocsFoo, 1, 1);
@@ -174,9 +177,9 @@ public class DataStreamIT extends ESIntegTestCase {
         assertThat(ObjectPath.eval("properties.@timestamp2.type", mappings), is("date"));
 
         int numDocsBar2 = randomIntBetween(2, 16);
-        indexDocs("metrics-bar", numDocsBar2);
+        indexDocs("metrics-bar", "@timestamp2", numDocsBar2);
         int numDocsFoo2 = randomIntBetween(2, 16);
-        indexDocs("metrics-foo", numDocsFoo2);
+        indexDocs("metrics-foo", "@timestamp1", numDocsFoo2);
 
         verifyDocs("metrics-bar", numDocsBar + numDocsBar2, 1, 2);
         verifyDocs("metrics-foo", numDocsFoo + numDocsFoo2, 1, 2);
@@ -208,7 +211,7 @@ public class DataStreamIT extends ESIntegTestCase {
 
         {
             BulkRequest bulkRequest = new BulkRequest()
-                .add(new IndexRequest(dataStreamName).source("{}", XContentType.JSON));
+                .add(new IndexRequest(dataStreamName).source("{\"@timestamp1\": \"2020-12-12\"}", XContentType.JSON));
             expectFailure(dataStreamName, () -> client().bulk(bulkRequest).actionGet());
         }
         {
@@ -218,11 +221,12 @@ public class DataStreamIT extends ESIntegTestCase {
         }
         {
             BulkRequest bulkRequest = new BulkRequest()
-                .add(new UpdateRequest(dataStreamName, "_id").doc("{}", XContentType.JSON));
+                .add(new UpdateRequest(dataStreamName, "_id").doc("{\"@timestamp1\": \"2020-12-12\"}", XContentType.JSON));
             expectFailure(dataStreamName, () -> client().bulk(bulkRequest).actionGet());
         }
         {
-            IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{}", XContentType.JSON);
+            IndexRequest indexRequest = new IndexRequest(dataStreamName)
+                .source("{\"@timestamp1\": \"2020-12-12\"}", XContentType.JSON);
             expectFailure(dataStreamName, () -> client().index(indexRequest).actionGet());
         }
         {
@@ -235,14 +239,15 @@ public class DataStreamIT extends ESIntegTestCase {
             expectFailure(dataStreamName, () -> client().delete(deleteRequest).actionGet());
         }
         {
-            IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{}", XContentType.JSON)
+            IndexRequest indexRequest = new IndexRequest(dataStreamName)
+                .source("{\"@timestamp1\": \"2020-12-12\"}", XContentType.JSON)
                 .opType(DocWriteRequest.OpType.CREATE);
             IndexResponse indexResponse = client().index(indexRequest).actionGet();
             assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 1)));
         }
         {
             BulkRequest bulkRequest = new BulkRequest()
-                .add(new IndexRequest(dataStreamName).source("{}", XContentType.JSON)
+                .add(new IndexRequest(dataStreamName).source("{\"@timestamp1\": \"2020-12-12\"}", XContentType.JSON)
                     .opType(DocWriteRequest.OpType.CREATE));
             BulkResponse bulkItemResponses  = client().bulk(bulkRequest).actionGet();
             assertThat(bulkItemResponses.getItems()[0].getIndex(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 1)));
@@ -279,7 +284,7 @@ public class DataStreamIT extends ESIntegTestCase {
         client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
 
         int numDocs = randomIntBetween(2, 16);
-        indexDocs(dataStreamName, numDocs);
+        indexDocs(dataStreamName, "@timestamp", numDocs);
         verifyDocs(dataStreamName, numDocs, 1, 1);
 
         String backingIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
@@ -310,7 +315,7 @@ public class DataStreamIT extends ESIntegTestCase {
             getIndexResponse.mappings().get(backingIndex).getSourceAsMap()), equalTo("keyword"));
 
         int numDocs2 = randomIntBetween(2, 16);
-        indexDocs(dataStreamName, numDocs2);
+        indexDocs(dataStreamName, "@timestamp", numDocs2);
         verifyDocs(dataStreamName, numDocs + numDocs2, 1, 2);
 
         DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request(dataStreamName);
@@ -373,7 +378,7 @@ public class DataStreamIT extends ESIntegTestCase {
         client().admin().indices().createDataStream(request).actionGet();
 
         verifyResolvability(dataStreamName, client().prepareIndex(dataStreamName)
-                .setSource("{}", XContentType.JSON)
+                .setSource("{\"ts\": \"2020-12-12\"}", XContentType.JSON)
                 .setOpType(DocWriteRequest.OpType.CREATE),
             false);
         verifyResolvability(dataStreamName, refreshBuilder(dataStreamName), false);
@@ -406,7 +411,7 @@ public class DataStreamIT extends ESIntegTestCase {
         request = new CreateDataStreamAction.Request("logs-barbaz");
         client().admin().indices().createDataStream(request).actionGet();
         verifyResolvability("logs-barbaz", client().prepareIndex("logs-barbaz")
-                .setSource("{}", XContentType.JSON)
+                .setSource("{\"ts\": \"2020-12-12\"}", XContentType.JSON)
                 .setOpType(DocWriteRequest.OpType.CREATE),
             false);
 
@@ -496,7 +501,8 @@ public class DataStreamIT extends ESIntegTestCase {
         putComposableIndexTemplate("id1", "@timestamp", List.of("logs-foo*"));
 
         // Index doc that triggers creation of a data stream
-        IndexRequest indexRequest = new IndexRequest("logs-foobar").source("{}", XContentType.JSON).opType("create");
+        IndexRequest indexRequest =
+            new IndexRequest("logs-foobar").source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON).opType("create");
         IndexResponse indexResponse = client().index(indexRequest).actionGet();
         assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1)));
         assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 1), "properties.@timestamp");
@@ -508,7 +514,7 @@ public class DataStreamIT extends ESIntegTestCase {
         assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 2), "properties.@timestamp");
 
         // Index another doc into a data stream
-        indexRequest = new IndexRequest("logs-foobar").source("{}", XContentType.JSON).opType("create");
+        indexRequest = new IndexRequest("logs-foobar").source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON).opType("create");
         indexResponse = client().index(indexRequest).actionGet();
         assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 2)));
 
@@ -523,7 +529,7 @@ public class DataStreamIT extends ESIntegTestCase {
         assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 3), "properties.@timestamp");
 
         // Index another doc into a data stream
-        indexRequest = new IndexRequest("logs-foobar").source("{}", XContentType.JSON).opType("create");
+        indexRequest = new IndexRequest("logs-foobar").source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON).opType("create");
         indexResponse = client().index(indexRequest).actionGet();
         assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 3)));
 
@@ -575,8 +581,7 @@ public class DataStreamIT extends ESIntegTestCase {
             "          \"format\": \"yyyy-MM\",\n" +
             "          \"meta\": {\n" +
             "            \"x\": \"y\"\n" +
-            "          },\n" +
-            "          \"store\": true\n" +
+            "          }\n" +
             "        }\n" +
             "      }\n" +
             "    }";
@@ -589,7 +594,7 @@ public class DataStreamIT extends ESIntegTestCase {
         assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
         assertThat(getDataStreamResponse.getDataStreams().get(0).getName(), equalTo("logs-foobar"));
         assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getName(), equalTo("@timestamp"));
-        Map<?, ?> expectedTimestampMapping = Map.of("type", "date", "format", "yyyy-MM", "meta", Map.of("x", "y"), "store", true);
+        Map<?, ?> expectedTimestampMapping = Map.of("type", "date", "format", "yyyy-MM", "meta", Map.of("x", "y"));
         assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getFieldMapping(), equalTo(expectedTimestampMapping));
         assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 1), "properties.@timestamp", expectedTimestampMapping);
 
@@ -617,13 +622,15 @@ public class DataStreamIT extends ESIntegTestCase {
         assertThat(rolloverResponse.getNewIndex(), equalTo(backingIndex2));
         assertTrue(rolloverResponse.isRolledOver());
 
-        Map<?, ?> expectedMapping = Map.of("properties", Map.of("@timestamp", Map.of("type", "date")));
+        Map<?, ?> expectedMapping =
+            Map.of("properties", Map.of("@timestamp", Map.of("type", "date")), "_timestamp", Map.of("path", "@timestamp"));
         GetMappingsResponse getMappingsResponse = getMapping("logs-foobar").get();
         assertThat(getMappingsResponse.getMappings().size(), equalTo(2));
         assertThat(getMappingsResponse.getMappings().get(backingIndex1).getSourceAsMap(), equalTo(expectedMapping));
         assertThat(getMappingsResponse.getMappings().get(backingIndex2).getSourceAsMap(), equalTo(expectedMapping));
 
-        expectedMapping = Map.of("properties", Map.of("@timestamp", Map.of("type", "date"), "my_field", Map.of("type", "keyword")));
+        expectedMapping = Map.of("properties", Map.of("@timestamp", Map.of("type", "date"), "my_field", Map.of("type", "keyword")),
+            "_timestamp", Map.of("path", "@timestamp"));
         putMapping("{\"properties\":{\"my_field\":{\"type\":\"keyword\"}}}", "logs-foobar").get();
         // The mappings of all backing indices should be updated:
         getMappingsResponse = getMapping("logs-foobar").get();
@@ -662,7 +669,8 @@ public class DataStreamIT extends ESIntegTestCase {
 
         // Index doc that triggers creation of a data stream
         String dataStream = "logs-foobar";
-        IndexRequest indexRequest = new IndexRequest(dataStream).source("{}", XContentType.JSON).opType(DocWriteRequest.OpType.CREATE);
+        IndexRequest indexRequest = new IndexRequest(dataStream).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)
+            .opType(DocWriteRequest.OpType.CREATE);
         IndexResponse indexResponse = client().index(indexRequest).actionGet();
         assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName(dataStream, 1)));
 
@@ -696,7 +704,8 @@ public class DataStreamIT extends ESIntegTestCase {
         putComposableIndexTemplate("id1", "@timestamp", List.of("logs-foo*"));
 
         // Index doc that triggers creation of a data stream
-        IndexRequest indexRequest = new IndexRequest("logs-foobar").source("{}", XContentType.JSON).opType(DocWriteRequest.OpType.CREATE);
+        IndexRequest indexRequest = new IndexRequest("logs-foobar").source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)
+            .opType(DocWriteRequest.OpType.CREATE);
         IndexResponse indexResponse = client().index(indexRequest).actionGet();
         assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1)));
 
@@ -721,6 +730,33 @@ public class DataStreamIT extends ESIntegTestCase {
         assertThat(ObjectPath.eval(timestampFieldPathInMapping, mappings), is(expectedMapping));
     }
 
+    public void testNoTimestampInDocument() throws Exception {
+        putComposableIndexTemplate("id", "@timestamp", List.of("logs-foobar*"));
+        String dataStreamName = "logs-foobar";
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+        client().admin().indices().createDataStream(createDataStreamRequest).get();
+
+        IndexRequest indexRequest = new IndexRequest(dataStreamName)
+            .opType("create")
+            .source("{}", XContentType.JSON);
+        Exception e = expectThrows(MapperParsingException.class, () -> client().index(indexRequest).actionGet());
+        assertThat(e.getCause().getMessage(), equalTo("data stream timestamp field [@timestamp] is missing"));
+    }
+
+    public void testMultipleTimestampValuesInDocument() throws Exception {
+        putComposableIndexTemplate("id", "@timestamp", List.of("logs-foobar*"));
+        String dataStreamName = "logs-foobar";
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+        client().admin().indices().createDataStream(createDataStreamRequest).get();
+
+        IndexRequest indexRequest = new IndexRequest(dataStreamName)
+            .opType("create")
+            .source("{\"@timestamp\": [\"2020-12-12\",\"2022-12-12\"]}", XContentType.JSON);
+        Exception e = expectThrows(MapperParsingException.class, () -> client().index(indexRequest).actionGet());
+        assertThat(e.getCause().getMessage(),
+            equalTo("data stream timestamp field [@timestamp] encountered multiple values"));
+    }
+
     private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) {
         verifyResolvability(dataStream, requestBuilder, fail, 0);
     }
@@ -755,12 +791,13 @@ public class DataStreamIT extends ESIntegTestCase {
         }
     }
 
-    private static void indexDocs(String dataStream, int numDocs) {
+    private static void indexDocs(String dataStream, String timestampField, int numDocs) {
         BulkRequest bulkRequest = new BulkRequest();
         for (int i = 0; i < numDocs; i++) {
+            String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
             bulkRequest.add(new IndexRequest(dataStream)
                 .opType(DocWriteRequest.OpType.CREATE)
-                .source("{}", XContentType.JSON));
+                .source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", timestampField, value), XContentType.JSON));
         }
         BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
         assertThat(bulkResponse.getItems().length, equalTo(numDocs));

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

@@ -2633,7 +2633,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
             client.prepareIndex(dataStream)
                 .setOpType(DocWriteRequest.OpType.CREATE)
                 .setId(Integer.toString(i))
-                .setSource(Collections.singletonMap("k", "v"))
+                .setSource(Collections.singletonMap("@timestamp", "2020-12-12"))
                 .execute().actionGet();
         }
         refresh();

+ 12 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

@@ -64,6 +64,7 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.mapper.TimestampFieldMapper;
 import org.elasticsearch.index.mapper.DocumentMapper;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.MapperService.MergeReason;
@@ -491,10 +492,21 @@ public class MetadataCreateIndexService {
         final List<Map<String, Object>> mappings = collectV2Mappings(request.mappings(), currentState, templateName, xContentRegistry);
 
         if (request.dataStreamName() != null) {
+            String timestampField;
             DataStream dataStream = currentState.metadata().dataStreams().get(request.dataStreamName());
             if (dataStream != null) {
+                // Data stream already exists and a new backing index gets added. For example during rollover.
+                timestampField = dataStream.getTimeStampField().getName();
+                // Use the timestamp field mapping as was recorded at the time the data stream was created
                 mappings.add(dataStream.getTimeStampField().getTimestampFieldMapping());
+            } else {
+                // The data stream doesn't yet exist and the first backing index gets created. Resolve ts field from template.
+                // (next time, the data stream instance does exist)
+                ComposableIndexTemplate template = currentState.metadata().templatesV2().get(templateName);
+                timestampField = template.getDataStreamTemplate().getTimestampField();
             }
+            // Add mapping for timestamp field mapper last, so that it can't be overwritten:
+            mappings.add(Map.of("_doc", Map.of(TimestampFieldMapper.NAME, Map.of("path", timestampField))));
         }
 
         final Settings aggregatedIndexSettings =

+ 8 - 0
server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java

@@ -638,4 +638,12 @@ public final class DateFieldMapper extends FieldMapper {
             builder.field("locale", fieldType().dateTimeFormatter().locale());
         }
     }
+
+    public Explicit<Boolean> getIgnoreMalformed() {
+        return ignoreMalformed;
+    }
+
+    public Long getNullValue() {
+        return nullValue;
+    }
 }

+ 14 - 1
server/src/main/java/org/elasticsearch/index/mapper/MapperMergeValidator.java

@@ -108,13 +108,18 @@ class MapperMergeValidator {
      * @param fieldAliasMappers The newly added field alias mappers.
      * @param fullPathObjectMappers All object mappers, indexed by their full path.
      * @param fieldTypes All field and field alias mappers, collected into a lookup structure.
+     * @param metadataMappers the new metadata field mappers
+     * @param newMapper The newly created {@link DocumentMapper}
      */
     public static void validateFieldReferences(List<FieldMapper> fieldMappers,
                                                List<FieldAliasMapper> fieldAliasMappers,
                                                Map<String, ObjectMapper> fullPathObjectMappers,
-                                               FieldTypeLookup fieldTypes) {
+                                               FieldTypeLookup fieldTypes,
+                                               MetadataFieldMapper[] metadataMappers,
+                                               DocumentMapper newMapper) {
         validateCopyTo(fieldMappers, fullPathObjectMappers, fieldTypes);
         validateFieldAliasTargets(fieldAliasMappers, fullPathObjectMappers);
+        validateTimestampFieldMapper(metadataMappers, newMapper);
     }
 
     private static void validateCopyTo(List<FieldMapper> fieldMappers,
@@ -169,6 +174,14 @@ class MapperMergeValidator {
         }
     }
 
+    private static void validateTimestampFieldMapper(MetadataFieldMapper[] metadataMappers, DocumentMapper newMapper) {
+        for (MetadataFieldMapper metadataFieldMapper : metadataMappers) {
+            if (metadataFieldMapper instanceof TimestampFieldMapper) {
+                ((TimestampFieldMapper) metadataFieldMapper).validate(newMapper.mappers());
+            }
+        }
+    }
+
     private static String getNestedScope(String path, Map<String, ObjectMapper> fullPathObjectMappers) {
         for (String parentPath = parentObject(path); parentPath != null; parentPath = parentObject(parentPath)) {
             ObjectMapper objectMapper = fullPathObjectMappers.get(parentPath);

+ 1 - 1
server/src/main/java/org/elasticsearch/index/mapper/MapperService.java

@@ -375,7 +375,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
         }
 
         MapperMergeValidator.validateFieldReferences(fieldMappers, fieldAliasMappers,
-            fullPathObjectMappers, newFieldTypes);
+            fullPathObjectMappers, newFieldTypes, metadataMappers, newMapper);
 
         ContextMapping.validateContextPaths(indexSettings.getIndexVersionCreated(), fieldMappers, newFieldTypes::get);
 

+ 263 - 0
server/src/main/java/org/elasticsearch/index/mapper/TimestampFieldMapper.java

@@ -0,0 +1,263 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.mapper;
+
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.index.DocValuesType;
+import org.apache.lucene.index.IndexOptions;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.search.Query;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryShardContext;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+public class TimestampFieldMapper extends MetadataFieldMapper {
+
+    public static final String NAME = "_timestamp";
+
+    public static class Defaults  {
+
+        public static final FieldType TIMESTAMP_FIELD_TYPE = new FieldType();
+
+        static {
+            TIMESTAMP_FIELD_TYPE.setIndexOptions(IndexOptions.NONE);
+            TIMESTAMP_FIELD_TYPE.freeze();
+        }
+    }
+
+    // For now the field shouldn't be useable in searches.
+    // In the future it should act as an alias to the actual data stream timestamp field.
+    public static final class TimestampFieldType extends MappedFieldType {
+
+        public TimestampFieldType() {
+            super(NAME, false, false, TextSearchInfo.NONE, Map.of());
+        }
+
+        @Override
+        public MappedFieldType clone() {
+            return new TimestampFieldType();
+        }
+
+        @Override
+        public String typeName() {
+            return NAME;
+        }
+
+        @Override
+        public Query termQuery(Object value, QueryShardContext context) {
+            throw new IllegalArgumentException("Field [" + name() + "] of type [" + typeName() + "] does not support term queries");
+        }
+
+        @Override
+        public Query existsQuery(QueryShardContext context) {
+            throw new IllegalArgumentException("Field [" + name() + "] of type [" + typeName() + "] does not support exists queries");
+        }
+
+    }
+
+    public static class Builder extends MetadataFieldMapper.Builder<Builder> {
+
+        private String path;
+
+        public Builder() {
+            super(NAME, Defaults.TIMESTAMP_FIELD_TYPE);
+        }
+
+        public void setPath(String path) {
+            this.path = path;
+        }
+
+        @Override
+        public MetadataFieldMapper build(BuilderContext context) {
+            return new TimestampFieldMapper(
+                fieldType,
+                new TimestampFieldType(),
+                path
+            );
+        }
+    }
+
+    public static class TypeParser implements MetadataFieldMapper.TypeParser {
+
+        @Override
+        public MetadataFieldMapper.Builder<?> parse(String name,
+                                                    Map<String, Object> node,
+                                                    ParserContext parserContext) throws MapperParsingException {
+            Builder builder = new Builder();
+            for (Iterator<Map.Entry<String, Object>> iterator = node.entrySet().iterator(); iterator.hasNext();) {
+                Map.Entry<String, Object> entry = iterator.next();
+                String fieldName = entry.getKey();
+                Object fieldNode = entry.getValue();
+                if (fieldName.equals("path")) {
+                    builder.setPath((String) fieldNode);
+                    iterator.remove();
+                }
+            }
+            return builder;
+        }
+
+        @Override
+        public MetadataFieldMapper getDefault(ParserContext parserContext) {
+            return new TimestampFieldMapper(Defaults.TIMESTAMP_FIELD_TYPE,
+                new TimestampFieldType(), null);
+        }
+    }
+
+    private final String path;
+
+    private TimestampFieldMapper(FieldType fieldType, MappedFieldType mappedFieldType, String path) {
+        super(fieldType, mappedFieldType);
+        this.path = path;
+    }
+
+    public void validate(DocumentFieldMappers lookup) {
+        if (path == null) {
+            // not configured, so skip the validation
+            return;
+        }
+
+        Mapper mapper = lookup.getMapper(path);
+        if (mapper == null) {
+            throw new IllegalArgumentException("the configured timestamp field [" + path + "] does not exist");
+        }
+
+        if (DateFieldMapper.CONTENT_TYPE.equals(mapper.typeName()) == false &&
+            DateFieldMapper.DATE_NANOS_CONTENT_TYPE.equals(mapper.typeName()) == false) {
+            throw new IllegalArgumentException("the configured timestamp field [" + path + "] is of type [" +
+                mapper.typeName() + "], but [" + DateFieldMapper.CONTENT_TYPE + "," + DateFieldMapper.DATE_NANOS_CONTENT_TYPE +
+                "] is expected");
+        }
+
+        DateFieldMapper dateFieldMapper = (DateFieldMapper) mapper;
+        if (dateFieldMapper.fieldType().isSearchable() == false) {
+            throw new IllegalArgumentException("the configured timestamp field [" + path + "] is not indexed");
+        }
+        if (dateFieldMapper.fieldType().hasDocValues() == false) {
+            throw new IllegalArgumentException("the configured timestamp field [" + path + "] doesn't have doc values");
+        }
+        if (dateFieldMapper.getNullValue() != null) {
+            throw new IllegalArgumentException("the configured timestamp field [" + path +
+                "] has disallowed [null_value] attribute specified");
+        }
+        if (dateFieldMapper.getIgnoreMalformed().explicit()) {
+            throw new IllegalArgumentException("the configured timestamp field [" + path +
+                "] has disallowed [ignore_malformed] attribute specified");
+        }
+
+        // Catch all validation that validates whether disallowed mapping attributes have been specified
+        // on the field this meta field refers to:
+        try (XContentBuilder builder = jsonBuilder()) {
+            builder.startObject();
+            dateFieldMapper.doXContentBody(builder, false, EMPTY_PARAMS);
+            builder.endObject();
+            Map<String, Object> configuredSettings =
+                XContentHelper.convertToMap(BytesReference.bytes(builder), false, XContentType.JSON).v2();
+
+            // Only type, meta and format attributes are allowed:
+            configuredSettings.remove("type");
+            configuredSettings.remove("meta");
+            configuredSettings.remove("format");
+            // All other configured attributes are not allowed:
+            if (configuredSettings.isEmpty() == false) {
+                throw new IllegalArgumentException("the configured timestamp field [@timestamp] has disallowed attributes: " +
+                    configuredSettings.keySet());
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    @Override
+    public void preParse(ParseContext context) throws IOException {
+    }
+
+    @Override
+    protected void parseCreateField(ParseContext context) throws IOException {
+        // Meta field doesn't create any fields, so this shouldn't happen.
+        throw new IllegalStateException(NAME + " field mapper cannot create fields");
+    }
+
+    @Override
+    public void postParse(ParseContext context) throws IOException {
+        if (path == null) {
+            // not configured, so skip the validation
+            return;
+        }
+
+        IndexableField[] fields = context.rootDoc().getFields(path);
+        if (fields.length == 0) {
+            throw new IllegalArgumentException("data stream timestamp field [" + path + "] is missing");
+        }
+
+        long numberOfValues =
+            Arrays.stream(fields)
+                .filter(indexableField -> indexableField.fieldType().docValuesType() == DocValuesType.SORTED_NUMERIC)
+                .count();
+        if (numberOfValues > 1) {
+            throw new IllegalArgumentException("data stream timestamp field [" + path + "] encountered multiple values");
+        }
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        if (path == null) {
+            return builder;
+        }
+
+        builder.startObject(simpleName());
+        builder.field("path", path);
+        return builder.endObject();
+    }
+
+    @Override
+    protected String contentType() {
+        return NAME;
+    }
+
+    @Override
+    protected boolean indexedByDefault() {
+        return false;
+    }
+
+    @Override
+    protected boolean docValuesByDefault() {
+        return false;
+    }
+
+    @Override
+    protected void mergeOptions(FieldMapper other, List<String> conflicts) {
+       TimestampFieldMapper otherTimestampFieldMapper = (TimestampFieldMapper) other;
+       if (Objects.equals(path, otherTimestampFieldMapper.path) == false) {
+           conflicts.add("cannot update path setting for [_timestamp]");
+       }
+    }
+}

+ 2 - 0
server/src/main/java/org/elasticsearch/indices/IndicesModule.java

@@ -33,6 +33,7 @@ import org.elasticsearch.index.engine.EngineFactory;
 import org.elasticsearch.index.mapper.BinaryFieldMapper;
 import org.elasticsearch.index.mapper.BooleanFieldMapper;
 import org.elasticsearch.index.mapper.CompletionFieldMapper;
+import org.elasticsearch.index.mapper.TimestampFieldMapper;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.FieldAliasMapper;
 import org.elasticsearch.index.mapper.FieldNamesFieldMapper;
@@ -159,6 +160,7 @@ public class IndicesModule extends AbstractModule {
         builtInMetadataMappers.put(NestedPathFieldMapper.NAME, new NestedPathFieldMapper.TypeParser());
         builtInMetadataMappers.put(VersionFieldMapper.NAME, new VersionFieldMapper.TypeParser());
         builtInMetadataMappers.put(SeqNoFieldMapper.NAME, new SeqNoFieldMapper.TypeParser());
+        builtInMetadataMappers.put(TimestampFieldMapper.NAME, new TimestampFieldMapper.TypeParser());
         //_field_names must be added last so that it has a chance to see all the other mappers
         builtInMetadataMappers.put(FieldNamesFieldMapper.NAME, new FieldNamesFieldMapper.TypeParser());
         return Collections.unmodifiableMap(builtInMetadataMappers);

+ 12 - 4
server/src/test/java/org/elasticsearch/index/mapper/MapperMergeValidatorTests.java

@@ -109,7 +109,9 @@ public class MapperMergeValidatorTests extends ESTestCase {
         MapperMergeValidator.validateFieldReferences(emptyList(),
             singletonList(aliasMapper),
             Collections.singletonMap("nested", objectMapper),
-            new FieldTypeLookup());
+            new FieldTypeLookup(),
+            new MetadataFieldMapper[0],
+            null);
     }
 
     public void testFieldAliasWithDifferentObjectScopes() {
@@ -122,7 +124,9 @@ public class MapperMergeValidatorTests extends ESTestCase {
         MapperMergeValidator.validateFieldReferences(emptyList(),
             singletonList(aliasMapper),
             fullPathObjectMappers,
-            new FieldTypeLookup());
+            new FieldTypeLookup(),
+            new MetadataFieldMapper[0],
+            null);
     }
 
     public void testFieldAliasWithNestedTarget() {
@@ -133,7 +137,9 @@ public class MapperMergeValidatorTests extends ESTestCase {
             MapperMergeValidator.validateFieldReferences(emptyList(),
                 singletonList(aliasMapper),
                 Collections.singletonMap("nested", objectMapper),
-                new FieldTypeLookup()));
+                new FieldTypeLookup(),
+                new MetadataFieldMapper[0],
+                null));
 
         String expectedMessage = "Invalid [path] value [nested.field] for field alias [alias]: " +
             "an alias must have the same nested scope as its target. The alias is not nested, " +
@@ -152,7 +158,9 @@ public class MapperMergeValidatorTests extends ESTestCase {
             MapperMergeValidator.validateFieldReferences(emptyList(),
                 singletonList(aliasMapper),
                 fullPathObjectMappers,
-                new FieldTypeLookup()));
+                new FieldTypeLookup(),
+                new MetadataFieldMapper[0],
+                null));
 
 
         String expectedMessage = "Invalid [path] value [nested1.field] for field alias [nested2.alias]: " +

+ 5 - 9
server/src/test/java/org/elasticsearch/index/mapper/SourceFieldMapperTests.java

@@ -121,19 +121,15 @@ public class SourceFieldMapperTests extends ESSingleNodeTestCase {
         assertThat(sourceAsMap.containsKey("path2"), equalTo(true));
     }
 
-    private void assertConflicts(String mapping1, String mapping2, DocumentMapperParser parser, String... conflicts) throws IOException {
+    static void assertConflicts(String mapping1, String mapping2, DocumentMapperParser parser, String... conflicts) throws IOException {
         DocumentMapper docMapper = parser.parse("type", new CompressedXContent(mapping1));
-        docMapper = parser.parse("type", docMapper.mappingSource());
         if (conflicts.length == 0) {
             docMapper.merge(parser.parse("type", new CompressedXContent(mapping2)).mapping(), MergeReason.MAPPING_UPDATE);
         } else {
-            try {
-                docMapper.merge(parser.parse("type", new CompressedXContent(mapping2)).mapping(), MergeReason.MAPPING_UPDATE);
-                fail();
-            } catch (IllegalArgumentException e) {
-                for (String conflict : conflicts) {
-                    assertThat(e.getMessage(), containsString(conflict));
-                }
+            Exception e = expectThrows(IllegalArgumentException.class,
+                () -> docMapper.merge(parser.parse("type", new CompressedXContent(mapping2)).mapping(), MergeReason.MAPPING_UPDATE));
+            for (String conflict : conflicts) {
+                assertThat(e.getMessage(), containsString(conflict));
             }
         }
     }

+ 167 - 0
server/src/test/java/org/elasticsearch/index/mapper/TimestampFieldMapperTests.java

@@ -0,0 +1,167 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.mapper;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.test.ESSingleNodeTestCase;
+
+import java.io.IOException;
+
+import static org.elasticsearch.index.mapper.SourceFieldMapperTests.assertConflicts;
+import static org.hamcrest.Matchers.equalTo;
+
+public class TimestampFieldMapperTests extends ESSingleNodeTestCase {
+
+    public void testPostParse() throws IOException {
+        String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
+            .startObject("_timestamp").field("path", "@timestamp").endObject()
+            .startObject("properties").startObject("@timestamp").field("type",
+                randomBoolean() ? "date" : "date_nanos").endObject().endObject()
+            .endObject().endObject());
+        DocumentMapper docMapper = createIndex("test").mapperService()
+            .merge("type", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE);
+
+        ParsedDocument doc = docMapper.parse(new SourceToParse("test", "1", BytesReference
+            .bytes(XContentFactory.jsonBuilder()
+                .startObject()
+                .field("@timestamp", "2020-12-12")
+                .endObject()),
+            XContentType.JSON));
+        assertThat(doc.rootDoc().getFields("@timestamp").length, equalTo(2));
+
+        Exception e = expectThrows(MapperException.class, () -> docMapper.parse(new SourceToParse("test", "1", BytesReference
+            .bytes(XContentFactory.jsonBuilder()
+                .startObject()
+                .field("@timestamp1", "2020-12-12")
+                .endObject()),
+            XContentType.JSON)));
+        assertThat(e.getCause().getMessage(), equalTo("data stream timestamp field [@timestamp] is missing"));
+
+        e = expectThrows(MapperException.class, () -> docMapper.parse(new SourceToParse("test", "1", BytesReference
+            .bytes(XContentFactory.jsonBuilder()
+                .startObject()
+                .array("@timestamp", "2020-12-12", "2020-12-13")
+                .endObject()),
+            XContentType.JSON)));
+        assertThat(e.getCause().getMessage(), equalTo("data stream timestamp field [@timestamp] encountered multiple values"));
+    }
+
+    public void testValidateNonExistingField() throws IOException {
+        String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
+            .startObject("_timestamp").field("path", "non-existing-field").endObject()
+            .startObject("properties").startObject("@timestamp").field("type", "date").endObject().endObject()
+            .endObject().endObject());
+
+        Exception e = expectThrows(IllegalArgumentException.class, () -> createIndex("test").mapperService()
+            .merge("type", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE));
+        assertThat(e.getMessage(), equalTo("the configured timestamp field [non-existing-field] does not exist"));
+    }
+
+    public void testValidateInvalidFieldType() throws IOException {
+        String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
+            .startObject("_timestamp").field("path", "@timestamp").endObject()
+            .startObject("properties").startObject("@timestamp").field("type", "keyword").endObject().endObject()
+            .endObject().endObject());
+
+        Exception e = expectThrows(IllegalArgumentException.class, () -> createIndex("test").mapperService()
+            .merge("type", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE));
+        assertThat(e.getMessage(),
+            equalTo("the configured timestamp field [@timestamp] is of type [keyword], but [date,date_nanos] is expected"));
+    }
+
+    public void testValidateNotIndexed() throws IOException {
+        String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
+            .startObject("_timestamp").field("path", "@timestamp").endObject()
+            .startObject("properties").startObject("@timestamp").field("type", "date").field("index", "false").endObject().endObject()
+            .endObject().endObject());
+
+        Exception e = expectThrows(IllegalArgumentException.class, () -> createIndex("test").mapperService()
+            .merge("type", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE));
+        assertThat(e.getMessage(), equalTo("the configured timestamp field [@timestamp] is not indexed"));
+    }
+
+    public void testValidateNotDocValues() throws IOException {
+        String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
+            .startObject("_timestamp").field("path", "@timestamp").endObject()
+            .startObject("properties").startObject("@timestamp").field("type", "date").field("doc_values", "false").endObject().endObject()
+            .endObject().endObject());
+
+        Exception e = expectThrows(IllegalArgumentException.class, () -> createIndex("test").mapperService()
+            .merge("type", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE));
+        assertThat(e.getMessage(), equalTo("the configured timestamp field [@timestamp] doesn't have doc values"));
+    }
+
+    public void testValidateNullValue() throws IOException {
+        String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
+            .startObject("_timestamp").field("path", "@timestamp").endObject()
+            .startObject("properties").startObject("@timestamp").field("type", "date")
+            .field("null_value", "2020-12-12").endObject().endObject()
+            .endObject().endObject());
+
+        Exception e = expectThrows(IllegalArgumentException.class, () -> createIndex("test").mapperService()
+            .merge("type", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE));
+        assertThat(e.getMessage(),
+            equalTo("the configured timestamp field [@timestamp] has disallowed [null_value] attribute specified"));
+    }
+
+    public void testValidateIgnoreMalformed() throws IOException {
+        String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
+            .startObject("_timestamp").field("path", "@timestamp").endObject()
+            .startObject("properties").startObject("@timestamp").field("type", "date").field("ignore_malformed", "true")
+            .endObject().endObject()
+            .endObject().endObject());
+
+        Exception e = expectThrows(IllegalArgumentException.class, () -> createIndex("test").mapperService()
+            .merge("type", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE));
+        assertThat(e.getMessage(),
+            equalTo("the configured timestamp field [@timestamp] has disallowed [ignore_malformed] attribute specified"));
+    }
+
+    public void testValidateNotDisallowedAttribute() throws IOException {
+        String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
+            .startObject("_timestamp").field("path", "@timestamp").endObject()
+            .startObject("properties").startObject("@timestamp").field("type", "date").field("store", "true")
+                .endObject().endObject()
+            .endObject().endObject());
+
+        Exception e = expectThrows(IllegalArgumentException.class, () -> createIndex("test").mapperService()
+            .merge("type", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE));
+        assertThat(e.getMessage(),
+            equalTo("the configured timestamp field [@timestamp] has disallowed attributes: [store]"));
+    }
+
+    public void testCannotUpdateTimestampField() throws IOException {
+        DocumentMapperParser parser = createIndex("test").mapperService().documentMapperParser();
+        String mapping1 = "{\"type\":{\"_timestamp\":{\"path\":\"@timestamp\"}, \"properties\": {\"@timestamp\": {\"type\": \"date\"}}}}}";
+        String mapping2 = "{\"type\":{\"_timestamp\":{\"path\":\"@timestamp2\"}, \"properties\": {\"@timestamp2\": {\"type\": \"date\"}," +
+            "\"@timestamp\": {\"type\": \"date\"}}}})";
+        assertConflicts(mapping1, mapping2, parser, "cannot update path setting for [_timestamp]");
+
+        mapping1 = "{\"type\":{\"properties\":{\"@timestamp\": {\"type\": \"date\"}}}}}";
+        mapping2 = "{\"type\":{\"_timestamp\":{\"path\":\"@timestamp2\"}, \"properties\": {\"@timestamp2\": {\"type\": \"date\"}," +
+            "\"@timestamp\": {\"type\": \"date\"}}}})";
+        assertConflicts(mapping1, mapping2, parser, "cannot update path setting for [_timestamp]");
+    }
+
+}

+ 3 - 1
server/src/test/java/org/elasticsearch/indices/IndicesModuleTests.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.indices;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.index.mapper.TimestampFieldMapper;
 import org.elasticsearch.index.mapper.FieldNamesFieldMapper;
 import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.IgnoredFieldMapper;
@@ -87,7 +88,8 @@ public class IndicesModuleTests extends ESTestCase {
 
     private static final String[] EXPECTED_METADATA_FIELDS = new String[]{ IgnoredFieldMapper.NAME, IdFieldMapper.NAME,
             RoutingFieldMapper.NAME, IndexFieldMapper.NAME, SourceFieldMapper.NAME, TypeFieldMapper.NAME,
-            NestedPathFieldMapper.NAME, VersionFieldMapper.NAME, SeqNoFieldMapper.NAME, FieldNamesFieldMapper.NAME };
+            NestedPathFieldMapper.NAME, VersionFieldMapper.NAME, SeqNoFieldMapper.NAME, TimestampFieldMapper.NAME,
+            FieldNamesFieldMapper.NAME };
 
     public void testBuiltinMappers() {
         IndicesModule module = new IndicesModule(Collections.emptyList());

+ 1 - 1
x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java

@@ -95,7 +95,7 @@ public final class TimeSeriesRestDriver {
 
     public static void indexDocument(RestClient client, String indexAbstractionName, boolean refresh) throws IOException {
         Request indexRequest = new Request("POST", indexAbstractionName + "/_doc" + (refresh ? "?refresh" : ""));
-        indexRequest.setEntity(new StringEntity("{\"a\": \"test\"}", ContentType.APPLICATION_JSON));
+        indexRequest.setEntity(new StringEntity("{\"@timestamp\": \"2020-12-12\"}", ContentType.APPLICATION_JSON));
         Response response = client.performRequest(indexRequest);
         logger.info(response.getStatusLine());
     }

+ 2 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

@@ -743,6 +743,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
             .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
         for (int i = 0; i < numTrainingRows; i++) {
             List<Object> source = List.of(
+                "time", "2020-12-12",
                 BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES.get(i % BOOLEAN_FIELD_VALUES.size()),
                 NUMERICAL_FIELD, NUMERICAL_FIELD_VALUES.get(i % NUMERICAL_FIELD_VALUES.size()),
                 DISCRETE_NUMERICAL_FIELD, DISCRETE_NUMERICAL_FIELD_VALUES.get(i % DISCRETE_NUMERICAL_FIELD_VALUES.size()),
@@ -773,6 +774,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
             if (NESTED_FIELD.equals(dependentVariable) == false) {
                 source.addAll(List.of(NESTED_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size())));
             }
+            source.addAll(List.of("time", "2020-12-12"));
             IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE);
             bulkRequestBuilder.add(indexRequest);
         }

+ 3 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java

@@ -15,6 +15,7 @@ import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.mapper.BooleanFieldMapper;
+import org.elasticsearch.index.mapper.TimestampFieldMapper;
 import org.elasticsearch.index.mapper.ObjectMapper;
 import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@@ -52,7 +53,8 @@ public class ExtractedFieldsDetector {
      * Fields to ignore. These are mostly internal meta fields.
      */
     private static final List<String> IGNORE_FIELDS = Arrays.asList("_id", "_field_names", "_index", "_parent", "_routing", "_seq_no",
-        "_source", "_type", "_uid", "_version", "_feature", "_ignored", "_nested_path", DestinationIndex.ID_COPY);
+        "_source", "_type", "_uid", "_version", "_feature", "_ignored", "_nested_path", DestinationIndex.ID_COPY,
+        TimestampFieldMapper.NAME);
 
     private final String[] index;
     private final DataFrameAnalyticsConfig config;

+ 5 - 3
x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_data_stream_resolvability.yml

@@ -102,7 +102,9 @@
       index:
         index:  logs-foobar
         refresh: true
-        body:   { foo: bar }
+        body:
+          foo: bar
+          '@timestamp': '2020-12-12'
 
   - do:
       ilm.explain_lifecycle:
@@ -272,14 +274,14 @@
         index:   simple-data-stream1
         id:      1
         op_type: create
-        body:    { keys: [1,2,3] }
+        body:    { keys: [1,2,3], '@timestamp': '2020-12-12' }
 
   - do:
       index:
         index:   simple-data-stream1
         id:      2
         op_type: create
-        body:    { keys: [4,5,6] }
+        body:    { keys: [4,5,6], '@timestamp': '2020-12-12' }
 
   - do:
       indices.refresh: