Browse Source

Lazily create the failure store (#109289)

Rather than initializing the failure store right away when a new
data stream is created, we leave it empty and mark it for lazy
rollover. This results in the failure store only being initialized
(i.e. an index created) when a failure has actually occurred.

The exception to the rule is when a failure occurs while the data
stream is being auto-created. In that case, we do want to initialize
the failure store right away.
Niels Bauman 1 year ago
parent
commit
ba91bfdc94
32 changed files with 702 additions and 209 deletions
  1. 1 1
      docs/reference/data-streams/change-mappings-and-settings.asciidoc
  2. 1 1
      docs/reference/data-streams/downsampling-manual.asciidoc
  3. 4 4
      docs/reference/data-streams/lifecycle/tutorial-migrate-data-stream-from-ilm-to-dsl.asciidoc
  4. 1 1
      docs/reference/indices/get-data-stream.asciidoc
  5. 11 0
      modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java
  6. 2 0
      modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java
  7. 1 1
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java
  8. 114 7
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml
  9. 125 13
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml
  10. 2 2
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml
  11. 181 34
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_rollover_failure_store.yml
  12. 3 4
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml
  13. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  14. 2 1
      server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
  15. 24 1
      server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
  16. 15 18
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java
  17. 45 1
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java
  18. 31 20
      server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
  19. 2 1
      server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java
  20. 4 14
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
  21. 7 9
      server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java
  22. 34 24
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java
  23. 3 1
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java
  24. 0 11
      server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java
  25. 3 2
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
  26. 2 2
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
  27. 2 2
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
  28. 3 3
      server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java
  29. 10 17
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java
  30. 62 14
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java
  31. 3 0
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java
  32. 3 0
      x-pack/plugin/core/src/javaRestTest/java/org/elasticsearch/xpack/core/DataStreamRestIT.java

+ 1 - 1
docs/reference/data-streams/change-mappings-and-settings.asciidoc

@@ -602,7 +602,7 @@ stream's oldest backing index.
 // TESTRESPONSE[s/"index_uuid": "_eEfRrFHS9OyhqWntkgHAQ"/"index_uuid": $body.data_streams.0.indices.1.index_uuid/]
 // TESTRESPONSE[s/"index_name": ".ds-my-data-stream-2099.03.07-000001"/"index_name": $body.data_streams.0.indices.0.index_name/]
 // TESTRESPONSE[s/"index_name": ".ds-my-data-stream-2099.03.08-000002"/"index_name": $body.data_streams.0.indices.1.index_name/]
-// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
+// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": true}/]
 
 <1> First item in the `indices` array for `my-data-stream`. This item contains
 information about the stream's oldest backing index,

+ 1 - 1
docs/reference/data-streams/downsampling-manual.asciidoc

@@ -389,7 +389,7 @@ This returns:
 // TESTRESPONSE[s/"ltOJGmqgTVm4T-Buoe7Acg"/$body.data_streams.0.indices.0.index_uuid/]
 // TESTRESPONSE[s/"2023-07-26T09:26:42.000Z"/$body.data_streams.0.time_series.temporal_ranges.0.start/]
 // TESTRESPONSE[s/"2023-07-26T13:26:42.000Z"/$body.data_streams.0.time_series.temporal_ranges.0.end/]
-// TESTRESPONSE[s/"replicated": false/"replicated": false,"failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
+// TESTRESPONSE[s/"replicated": false/"replicated": false,"failure_store":{"enabled": false, "indices": [], "rollover_on_write": true}/]
 <1> The backing index for this data stream.
 
 Before a backing index can be downsampled, the TSDS needs to be rolled over and

+ 4 - 4
docs/reference/data-streams/lifecycle/tutorial-migrate-data-stream-from-ilm-to-dsl.asciidoc

@@ -147,7 +147,7 @@ and that the next generation index will also be managed by {ilm-init}:
 // TESTRESPONSE[s/"index_uuid": "xCEhwsp8Tey0-FLNFYVwSg"/"index_uuid": $body.data_streams.0.indices.0.index_uuid/]
 // TESTRESPONSE[s/"index_name": ".ds-dsl-data-stream-2023.10.19-000002"/"index_name": $body.data_streams.0.indices.1.index_name/]
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8DJ5gw"/"index_uuid": $body.data_streams.0.indices.1.index_uuid/]
-// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
+// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": true}/]
 
 <1> The name of the backing index.
 <2> For each backing index we display the value of the <<index-lifecycle-prefer-ilm, prefer_ilm>>
@@ -284,7 +284,7 @@ GET _data_stream/dsl-data-stream
 // TESTRESPONSE[s/"index_uuid": "xCEhwsp8Tey0-FLNFYVwSg"/"index_uuid": $body.data_streams.0.indices.0.index_uuid/]
 // TESTRESPONSE[s/"index_name": ".ds-dsl-data-stream-2023.10.19-000002"/"index_name": $body.data_streams.0.indices.1.index_name/]
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8DJ5gw"/"index_uuid": $body.data_streams.0.indices.1.index_uuid/]
-// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
+// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": true}/]
 
 <1> The existing backing index will continue to be managed by {ilm-init}
 <2> The existing backing index will continue to be managed by {ilm-init}
@@ -364,7 +364,7 @@ GET _data_stream/dsl-data-stream
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8DJ5gw"/"index_uuid": $body.data_streams.0.indices.1.index_uuid/]
 // TESTRESPONSE[s/"index_name": ".ds-dsl-data-stream-2023.10.19-000003"/"index_name": $body.data_streams.0.indices.2.index_name/]
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8abcd1"/"index_uuid": $body.data_streams.0.indices.2.index_uuid/]
-// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
+// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": true}/]
 
 <1> The backing indices that existed before rollover will continue to be managed by {ilm-init}
 <2> The backing indices that existed before rollover will continue to be managed by {ilm-init}
@@ -462,7 +462,7 @@ GET _data_stream/dsl-data-stream
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8DJ5gw"/"index_uuid": $body.data_streams.0.indices.1.index_uuid/]
 // TESTRESPONSE[s/"index_name": ".ds-dsl-data-stream-2023.10.19-000003"/"index_name": $body.data_streams.0.indices.2.index_name/]
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8abcd1"/"index_uuid": $body.data_streams.0.indices.2.index_uuid/]
-// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
+// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": true}/]
 <1> The write index is now managed by {ilm-init}
 <2> The `lifecycle` configured on the data stream is now disabled.
 <3> The next write index will be managed by {ilm-init}

+ 1 - 1
docs/reference/indices/get-data-stream.asciidoc

@@ -358,4 +358,4 @@ The API returns the following response:
 // TESTRESPONSE[s/"index_name": ".ds-my-data-stream-two-2099.03.08-000001"/"index_name": $body.data_streams.1.indices.0.index_name/]
 // TESTRESPONSE[s/"index_uuid": "3liBu2SYS5axasRt6fUIpA"/"index_uuid": $body.data_streams.1.indices.0.index_uuid/]
 // TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW"/]
-// TESTRESPONSE[s/"replicated": false/"replicated": false,"failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
+// TESTRESPONSE[s/"replicated": false/"replicated": false,"failure_store":{"enabled": false, "indices": [], "rollover_on_write": true}/]

+ 11 - 0
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

@@ -22,6 +22,7 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
 import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
 import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteComposableIndexTemplateAction;
@@ -127,6 +128,16 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         response = client.execute(CreateDataStreamAction.INSTANCE, request).get();
         assertTrue(response.isAcknowledged());
 
+        // Initialize the failure store.
+        RolloverRequest rolloverRequest = new RolloverRequest("with-fs", null);
+        rolloverRequest.setIndicesOptions(
+            IndicesOptions.builder(rolloverRequest.indicesOptions())
+                .failureStoreOptions(b -> b.includeRegularIndices(false).includeFailureIndices(true))
+                .build()
+        );
+        response = client.execute(RolloverAction.INSTANCE, rolloverRequest).get();
+        assertTrue(response.isAcknowledged());
+
         // Resolve backing index names after data streams have been created:
         // (these names have a date component, and running around midnight could lead to test failures otherwise)
         GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { "*" });

+ 2 - 0
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java

@@ -51,6 +51,8 @@ public class FailureStoreQueryParamIT extends DisabledSecurityDataStreamTestCase
         assertOK(client().performRequest(putComposableIndexTemplateRequest));
 
         assertOK(client().performRequest(new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME)));
+        // Initialize the failure store.
+        assertOK(client().performRequest(new Request("POST", DATA_STREAM_NAME + "/_rollover?target_failure_store")));
         ensureGreen(DATA_STREAM_NAME);
 
         final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME));

+ 1 - 1
modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java

@@ -315,7 +315,7 @@ public class DataStreamGetWriteIndexTests extends ESTestCase {
             TimeValue.ZERO,
             false
         );
-        return createDataStreamService.createDataStream(request, state, ActionListener.noop());
+        return createDataStreamService.createDataStream(request, state, ActionListener.noop(), false);
     }
 
     private MetadataRolloverService.RolloverResult rolloverOver(ClusterState state, String name, Instant time) throws Exception {

+ 114 - 7
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml

@@ -261,6 +261,12 @@ setup:
               index:
                 default_pipeline: "data_stream_pipeline"
                 final_pipeline: "data_stream_final_pipeline"
+            mappings:
+              properties:
+                '@timestamp':
+                  type: date
+                count:
+                  type: long
 
   - do:
       indices.create_data_stream:
@@ -272,6 +278,23 @@ setup:
         name: failure-data-stream2
   - is_true: acknowledged
 
+  # Initialize failure store
+  - do:
+      index:
+        index: failure-data-stream1
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          count: 'invalid value'
+  # Initialize failure store
+  - do:
+      index:
+        index: failure-data-stream2
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          count: 'invalid value'
+
   - do:
       cluster.health:
         wait_for_status: green
@@ -281,7 +304,7 @@ setup:
         name: "*"
   - match: { data_streams.0.name: failure-data-stream1 }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
-  - match: { data_streams.0.generation: 1 }
+  - match: { data_streams.0.generation: 2 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - match: { data_streams.0.status: 'GREEN' }
@@ -289,18 +312,18 @@ setup:
   - match: { data_streams.0.hidden: false }
   - match: { data_streams.0.failure_store.enabled: true }
   - length: { data_streams.0.failure_store.indices: 1 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/'}
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000002/'}
 
   - match: { data_streams.1.name: failure-data-stream2 }
   - match: { data_streams.1.timestamp_field.name: '@timestamp' }
-  - match: { data_streams.1.generation: 1 }
+  - match: { data_streams.1.generation: 2 }
   - length: { data_streams.1.indices: 1 }
   - match: { data_streams.1.indices.0.index_name: '/\.ds-failure-data-stream2-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - match: { data_streams.1.template: 'my-template4' }
   - match: { data_streams.1.hidden: false }
   - match: { data_streams.1.failure_store.enabled: true }
   - length: { data_streams.1.failure_store.indices: 1 }
-  - match: { data_streams.1.failure_store.indices.0.index_name: '/\.fs-failure-data-stream2-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.1.failure_store.indices.0.index_name: '/\.fs-failure-data-stream2-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
 
   # save the backing index names for later use
   - set: { data_streams.0.indices.0.index_name: idx0name }
@@ -603,7 +626,7 @@ setup:
         index: $idx0name
 
 ---
-"Delete data stream with failure stores":
+"Delete data stream with failure store":
   - requires:
       cluster_features: ["gte_v8.15.0"]
       reason: "data stream failure stores REST structure changed in 8.15+"
@@ -617,12 +640,28 @@ setup:
           index_patterns: [ failure-data-stream1 ]
           data_stream:
             failure_store: true
+          template:
+            mappings:
+              properties:
+                '@timestamp':
+                  type: date
+                count:
+                  type: long
 
   - do:
       indices.create_data_stream:
         name: failure-data-stream1
   - is_true: acknowledged
 
+  # Initialize failure store
+  - do:
+      index:
+        index: failure-data-stream1
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          count: 'invalid value'
+
   - do:
       indices.create:
         index: test_index
@@ -650,11 +689,11 @@ setup:
       indices.get_data_stream: {}
   - match: { data_streams.0.name: failure-data-stream1 }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
-  - match: { data_streams.0.generation: 1 }
+  - match: { data_streams.0.generation: 2 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - length: { data_streams.0.failure_store.indices: 1 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
 
   - do:
       indices.delete_data_stream:
@@ -676,6 +715,74 @@ setup:
         name: my-template4
   - is_true: acknowledged
 
+---
+"Delete data stream with failure store uninitialized":
+  - requires:
+      cluster_features: ["gte_v8.15.0"]
+      reason: "data stream failure stores REST structure changed in 8.15+"
+
+  - do:
+      allowed_warnings:
+        - "index template [my-template4] has index patterns [failure-data-stream1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template4] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template4
+        body:
+          index_patterns: [ failure-data-stream1 ]
+          data_stream:
+            failure_store: true
+
+  - do:
+      indices.create_data_stream:
+        name: failure-data-stream1
+  - is_true: acknowledged
+
+  - do:
+      indices.create:
+        index: test_index
+        body:
+          settings:
+            number_of_shards:   1
+            number_of_replicas: 1
+
+  # save the backing index names for later use
+  - do:
+      indices.get_data_stream:
+        name: failure-data-stream1
+
+  - set: { data_streams.0.indices.0.index_name: idx0name }
+  - length: { data_streams.0.failure_store.indices: 0 }
+
+  - do:
+      indices.get:
+        index: ['.ds-failure-data-stream1-*000001', 'test_index']
+
+  - is_true: test_index.settings
+  - is_true: .$idx0name.settings
+
+  - do:
+      indices.get_data_stream: {}
+  - match: { data_streams.0.name: failure-data-stream1 }
+  - match: { data_streams.0.timestamp_field.name: '@timestamp' }
+  - match: { data_streams.0.generation: 1 }
+  - length: { data_streams.0.indices: 1 }
+  - match: { data_streams.0.indices.0.index_name: '/\.ds-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - length: { data_streams.0.failure_store.indices: 0 }
+
+  - do:
+      indices.delete_data_stream:
+        name: failure-data-stream1
+  - is_true: acknowledged
+
+  - do:
+      catch: missing
+      indices.get:
+        index: $idx0name
+
+  - do:
+      indices.delete_index_template:
+        name: my-template4
+  - is_true: acknowledged
+
 ---
 "Delete data stream missing behaviour":
   - requires:

+ 125 - 13
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml

@@ -105,6 +105,13 @@
           index_patterns: [data-*]
           data_stream:
             failure_store: true
+          template:
+            mappings:
+              properties:
+                '@timestamp':
+                  type: date
+                count:
+                  type: long
 
   - do:
       indices.create_data_stream:
@@ -116,6 +123,23 @@
         name: data-stream-for-modification2
   - is_true: acknowledged
 
+  # Initialize failure store
+  - do:
+      index:
+        index: data-stream-for-modification
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          count: 'invalid value'
+  # Initialize failure store
+  - do:
+      index:
+        index: data-stream-for-modification2
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          count: 'invalid value'
+
   # rollover data stream to create new failure store index
   - do:
       indices.rollover:
@@ -168,7 +192,7 @@
         name: "data-stream-for-modification"
   - match: { data_streams.0.name: data-stream-for-modification }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
-  - match: { data_streams.0.generation: 3 }
+  - match: { data_streams.0.generation: 4 }
   - length: { data_streams.0.indices: 1 }
   - length: { data_streams.0.failure_store.indices: 3 }
   - match: { data_streams.0.indices.0.index_name: $write_index }
@@ -187,17 +211,6 @@
                 index: test_index2
                 failure_store: true
 
-  # We are not allowed to remove the write index for the failure store
-  - do:
-      catch: /cannot remove backing index \[.*\] of data stream \[data-stream-for-modification\] because it is the write index/
-      indices.modify_data_stream:
-        body:
-          actions:
-            - remove_backing_index:
-                data_stream: "data-stream-for-modification"
-                index: $write_failure_index
-                failure_store: true
-
   # We will not accept an index that is already part of the data stream's backing indices
   - do:
       catch: /cannot add index \[.*\] to data stream \[data-stream-for-modification\] because it is already a backing index on data stream \[data-stream-for-modification\]/
@@ -267,13 +280,112 @@
         name: "data-stream-for-modification"
   - match: { data_streams.0.name: data-stream-for-modification }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
-  - match: { data_streams.0.generation: 4 }
+  - match: { data_streams.0.generation: 5 }
   - length: { data_streams.0.indices: 1 }
   - length: { data_streams.0.failure_store.indices: 2 }
   - match: { data_streams.0.indices.0.index_name: $write_index }
   - match: { data_streams.0.failure_store.indices.0.index_name: $first_failure_index }
   - match: { data_streams.0.failure_store.indices.1.index_name: $write_failure_index }
 
+  # Remove write index of the failure store
+  - do:
+      indices.modify_data_stream:
+        body:
+          actions:
+            - remove_backing_index:
+                data_stream: "data-stream-for-modification"
+                index: $write_failure_index
+                failure_store: true
+  - is_true: acknowledged
+
+  - do:
+      indices.get_data_stream:
+        name: "data-stream-for-modification"
+  - match: { data_streams.0.name: data-stream-for-modification }
+  - match: { data_streams.0.timestamp_field.name: '@timestamp' }
+  - match: { data_streams.0.generation: 6 }
+  - length: { data_streams.0.indices: 1 }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.indices.0.index_name: $write_index }
+  - match: { data_streams.0.failure_store.indices.0.index_name: $first_failure_index }
+
+  # Remove the last write index of the failure store
+  - do:
+      indices.modify_data_stream:
+        body:
+          actions:
+            - remove_backing_index:
+                data_stream: "data-stream-for-modification"
+                index: $first_failure_index
+                failure_store: true
+  - is_true: acknowledged
+
+  - do:
+      indices.get_data_stream:
+        name: "data-stream-for-modification"
+  - match: { data_streams.0.name: data-stream-for-modification }
+  - match: { data_streams.0.timestamp_field.name: '@timestamp' }
+  - match: { data_streams.0.generation: 7 }
+  - length: { data_streams.0.indices: 1 }
+  - length: { data_streams.0.failure_store.indices: 0 }
+  - match: { data_streams.0.indices.0.index_name: $write_index }
+
+  # Doing these checks again to make sure we still return the same error with  an empty failure store
+  # We will not accept an index that is already part of the data stream's backing indices
+  - do:
+      catch: /cannot add index \[.*\] to data stream \[data-stream-for-modification\] because it is already a backing index on data stream \[data-stream-for-modification\]/
+      indices.modify_data_stream:
+        body:
+          actions:
+            - add_backing_index:
+                data_stream: "data-stream-for-modification"
+                index: $write_index
+                failure_store: true
+
+  # We will not accept an index that is already part of a different data stream's backing indices
+  - do:
+      catch: /cannot add index \[.*\] to data stream \[data-stream-for-modification\] because it is already a backing index on data stream \[data-stream-for-modification2\]/
+      indices.modify_data_stream:
+        body:
+          actions:
+            - add_backing_index:
+                data_stream: "data-stream-for-modification"
+                index: $second_write_index
+                failure_store: true
+
+  # We will not accept an index that is already part of a different data stream's failure store
+  - do:
+      catch: /cannot add index \[.*\] to data stream \[data-stream-for-modification\] because it is already a failure store index on data stream \[data-stream-for-modification2\]/
+      indices.modify_data_stream:
+        body:
+          actions:
+            - add_backing_index:
+                data_stream: "data-stream-for-modification"
+                index: $second_write_failure_index
+                failure_store: true
+
+  # We will return a failed response if we try to remove an index from the failure store that is not present
+  - do:
+      catch: /index \[.*\] not found/
+      indices.modify_data_stream:
+        body:
+          actions:
+            - remove_backing_index:
+                data_stream: "data-stream-for-modification"
+                index: $write_index
+                failure_store: true
+
+  # Add index to empty failure store
+  - do:
+      indices.modify_data_stream:
+        body:
+          actions:
+            - add_backing_index:
+                data_stream: "data-stream-for-modification"
+                index: "test_index1"
+                failure_store: true
+  - is_true: acknowledged
+
   - do:
       indices.delete_data_stream:
         name: data-stream-for-modification

+ 2 - 2
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

@@ -181,7 +181,7 @@ teardown:
   - match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - match: { data_streams.0.failure_store.enabled: true }
   - length: { data_streams.0.failure_store.indices: 1 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
 
   - do:
       search:
@@ -193,7 +193,7 @@ teardown:
       search:
         index: .fs-logs-foobar-*
   - length:   { hits.hits: 1  }
-  - match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
+  - match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
   - exists: hits.hits.0._source.@timestamp
   - not_exists: hits.hits.0._source.count
   - match: { hits.hits.0._source.document.index: 'logs-foobar' }

+ 181 - 34
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_rollover_failure_store.yml

@@ -39,14 +39,23 @@ teardown:
         ignore: 404
 ---
 "Roll over a data stream's failure store without conditions":
+  # Initialize failure store
+  - do:
+      index:
+        index: data-stream-for-rollover
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          count: 'invalid value'
+
   - do:
       indices.rollover:
         alias: "data-stream-for-rollover"
         target_failure_store: true
 
   - match: { acknowledged: true }
-  - match: { old_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
-  - match: { new_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
+  - match: { old_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
+  - match: { new_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000003/" }
   - match: { rolled_over: true }
   - match: { dry_run: false }
 
@@ -56,12 +65,12 @@ teardown:
   - match: { data_streams.0.name: data-stream-for-rollover }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
   # Both backing and failure indices use the same generation field.
-  - match: { data_streams.0.generation: 2 }
+  - match: { data_streams.0.generation: 3 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - length: { data_streams.0.failure_store.indices: 2 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000003/' }
 
 ---
 "Roll over a data stream's failure store with conditions":
@@ -82,8 +91,8 @@ teardown:
             max_docs: 1
 
   - match: { acknowledged: true }
-  - match: { old_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
-  - match: { new_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
+  - match: { old_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
+  - match: { new_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000003/" }
   - match: { rolled_over: true }
   - match: { dry_run: false }
 
@@ -93,22 +102,31 @@ teardown:
   - match: { data_streams.0.name: data-stream-for-rollover }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
   # Both backing and failure indices use the same generation field.
-  - match: { data_streams.0.generation: 2 }
+  - match: { data_streams.0.generation: 3 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - length: { data_streams.0.failure_store.indices: 2 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000003/' }
 
 ---
 "Don't roll over a data stream's failure store when conditions aren't met":
+  # Initialize failure store
+  - do:
+      index:
+        index: data-stream-for-rollover
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          count: 'invalid value'
+
   - do:
       indices.rollover:
         alias: "data-stream-for-rollover"
         target_failure_store: true
         body:
           conditions:
-            max_docs: 1
+            max_primary_shard_docs: 2
 
   - match: { acknowledged: false }
   - match: { rolled_over: false }
@@ -119,11 +137,11 @@ teardown:
         name: "*"
   - match: { data_streams.0.name: data-stream-for-rollover }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
-  - match: { data_streams.0.generation: 1 }
+  - match: { data_streams.0.generation: 2 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - length: { data_streams.0.failure_store.indices: 1 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
 
 ---
 "Lazily roll over a data stream's failure store after a shard failure":
@@ -135,6 +153,15 @@ teardown:
           path: /{index}/_rollover
           capabilities: [lazy-rollover-failure-store]
 
+  # Initialize failure store
+  - do:
+      index:
+        index: data-stream-for-rollover
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          count: 'invalid value'
+
   # Mark the failure store for lazy rollover
   - do:
       indices.rollover:
@@ -151,11 +178,11 @@ teardown:
         name: "*"
   - match: { data_streams.0.name: data-stream-for-rollover }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
-  - match: { data_streams.0.generation: 1 }
+  - match: { data_streams.0.generation: 2 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - length: { data_streams.0.failure_store.indices: 1 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
 
   - do:
       index:
@@ -171,24 +198,20 @@ teardown:
   - match: { data_streams.0.name: data-stream-for-rollover }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
   # Both backing and failure indices use the same generation field.
-  - match: { data_streams.0.generation: 2 }
+  - match: { data_streams.0.generation: 3 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - length: { data_streams.0.failure_store.indices: 2 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000003/' }
 
+  # Ensure failure got redirected to new index (after rollover).
   - do:
       search:
         index: .fs-data-stream-for-rollover-*
-  - length: { hits.hits: 1  }
+  - length: { hits.hits: 2  }
   - match: { hits.hits.0._index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
-  - exists: hits.hits.0._source.@timestamp
-  - not_exists: hits.hits.0._source.count
-  - match: { hits.hits.0._source.document.index: 'data-stream-for-rollover' }
-  - match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' }
-  - match: { hits.hits.0._source.document.source.count: 'invalid value' }
-  - match: { hits.hits.0._source.error.type: 'document_parsing_exception' }
+  - match: { hits.hits.1._index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000003/" }
 
 ---
 "Lazily roll over a data stream's failure store after an ingest failure":
@@ -234,6 +257,15 @@ teardown:
       indices.create_data_stream:
         name: data-stream-for-lazy-rollover
 
+  # Initialize failure store
+  - do:
+      index:
+        index: data-stream-for-lazy-rollover
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          count: 'invalid value'
+
   # Mark the failure store for lazy rollover
   - do:
       indices.rollover:
@@ -250,11 +282,11 @@ teardown:
         name: "*"
   - match: { data_streams.0.name: data-stream-for-lazy-rollover }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
-  - match: { data_streams.0.generation: 1 }
+  - match: { data_streams.0.generation: 2 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - length: { data_streams.0.failure_store.indices: 1 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
 
   - do:
       index:
@@ -270,13 +302,20 @@ teardown:
   - match: { data_streams.0.name: data-stream-for-lazy-rollover }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
   # Both backing and failure indices use the same generation field.
-  - match: { data_streams.0.generation: 2 }
+  - match: { data_streams.0.generation: 3 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - length: { data_streams.0.failure_store.indices: 2 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000003/' }
 
+  # Ensure failure got redirected to new index (after rollover).
+  - do:
+      search:
+        index: .fs-data-stream-for-lazy-rollover-*
+  - length: { hits.hits: 2  }
+  - match: { hits.hits.0._index: "/\\.fs-data-stream-for-lazy-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
+  - match: { hits.hits.1._index: "/\\.fs-data-stream-for-lazy-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000003/" }
 ---
 "A failure store marked for lazy rollover should only be rolled over when there is a failure":
   - requires:
@@ -287,6 +326,15 @@ teardown:
           path: /{index}/_rollover
           capabilities: [lazy-rollover-failure-store]
 
+  # Initialize failure store
+  - do:
+      index:
+        index: data-stream-for-rollover
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          count: 'invalid value'
+
   # Mark the failure store for lazy rollover
   - do:
       indices.rollover:
@@ -303,11 +351,11 @@ teardown:
         name: "*"
   - match: { data_streams.0.name: data-stream-for-rollover }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
-  - match: { data_streams.0.generation: 1 }
+  - match: { data_streams.0.generation: 2 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - length: { data_streams.0.failure_store.indices: 1 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
 
   - do:
       index:
@@ -323,8 +371,107 @@ teardown:
   - match: { data_streams.0.name: data-stream-for-rollover }
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
   # Both backing and failure indices use the same generation field.
-  - match: { data_streams.0.generation: 1 }
+  - match: { data_streams.0.generation: 2 }
+  - length: { data_streams.0.indices: 1 }
+  - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+
+---
+"Rolling over an uninitialized failure store should initialize it":
+  # Initializing with conditions is not allowed.
+  - do:
+      catch: /Rolling over\/initializing an empty failure store is only supported without conditions\./
+      indices.rollover:
+        alias: "data-stream-for-rollover"
+        target_failure_store: true
+        body:
+          conditions:
+            max_docs: 1
+
+  - do:
+      indices.rollover:
+        alias: "data-stream-for-rollover"
+        target_failure_store: true
+
+  - match: { acknowledged: true }
+  - match: { old_index: "_none_" }
+  - match: { new_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
+  - match: { rolled_over: true }
+  - match: { dry_run: false }
+
+  - do:
+      indices.get_data_stream:
+        name: "*"
+  - match: { data_streams.0.name: data-stream-for-rollover }
+  - match: { data_streams.0.timestamp_field.name: '@timestamp' }
+  # Both backing and failure indices use the same generation field.
+  - match: { data_streams.0.generation: 2 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - length: { data_streams.0.failure_store.indices: 1 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+
+---
+"Rolling over a failure store on a data stream without the failure store enabled should work":
+  - do:
+      allowed_warnings:
+        - "index template [my-other-template] has index patterns [data-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-other-template
+        body:
+          index_patterns: [other-data-*]
+          data_stream: {}
+
+  - do:
+      indices.create_data_stream:
+        name: other-data-stream-for-rollover
+
+  # Initializing should work
+  - do:
+      indices.rollover:
+        alias: "other-data-stream-for-rollover"
+        target_failure_store: true
+
+  - match: { acknowledged: true }
+  - match: { old_index: "_none_" }
+  - match: { new_index: "/\\.fs-other-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
+  - match: { rolled_over: true }
+  - match: { dry_run: false }
+
+  - do:
+      indices.get_data_stream:
+        name: other-data-stream-for-rollover
+  - match: { data_streams.0.name: other-data-stream-for-rollover }
+  - match: { data_streams.0.timestamp_field.name: '@timestamp' }
+  # Both backing and failure indices use the same generation field.
+  - match: { data_streams.0.generation: 2 }
+  - length: { data_streams.0.indices: 1 }
+  - match: { data_streams.0.indices.0.index_name: '/\.ds-other-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-other-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+
+  # And "regular" rollover should work
+  - do:
+      indices.rollover:
+        alias: "other-data-stream-for-rollover"
+        target_failure_store: true
+
+  - match: { acknowledged: true }
+  - match: { old_index: "/\\.fs-other-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
+  - match: { new_index: "/\\.fs-other-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000003/" }
+  - match: { rolled_over: true }
+  - match: { dry_run: false }
+
+  - do:
+      indices.get_data_stream:
+        name: other-data-stream-for-rollover
+  - match: { data_streams.0.name: other-data-stream-for-rollover }
+  - match: { data_streams.0.timestamp_field.name: '@timestamp' }
+  # Both backing and failure indices use the same generation field.
+  - match: { data_streams.0.generation: 3 }
+  - length: { data_streams.0.indices: 1 }
+  - match: { data_streams.0.indices.0.index_name: '/\.ds-other-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - length: { data_streams.0.failure_store.indices: 2 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-other-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-other-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000003/' }

+ 3 - 4
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml

@@ -1,5 +1,5 @@
 ---
-"Put index template":
+"Auto-create data stream":
   - requires:
       cluster_features: ["gte_v7.9.0"]
       reason: "data streams only supported in 7.9+"
@@ -48,7 +48,7 @@
   - is_true: acknowledged
 
 ---
-"Put index template with failure store":
+"Don't initialize failure store during data stream auto-creation on successful index":
   - requires:
       cluster_features: ["gte_v8.15.0"]
       reason: "data stream failure stores REST structure changed in 8.15+"
@@ -92,8 +92,7 @@
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - match: { data_streams.0.failure_store.enabled: true }
-  - length: { data_streams.0.failure_store.indices: 1 }
-  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - length: { data_streams.0.failure_store.indices: 0 }
 
   - do:
       indices.delete_data_stream:

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -195,6 +195,7 @@ public class TransportVersions {
     public static final TransportVersion DELETE_SNAPSHOTS_ASYNC_ADDED = def(8_686_00_0);
     public static final TransportVersion VERSION_SUPPORTING_SPARSE_VECTOR_STATS = def(8_687_00_0);
     public static final TransportVersion ML_AD_OUTPUT_MEMORY_ALLOCATOR_FIELD = def(8_688_00_0);
+    public static final TransportVersion FAILURE_STORE_LAZY_CREATION = def(8_689_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 2 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java

@@ -261,7 +261,8 @@ public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
                     ClusterState clusterState = metadataCreateDataStreamService.createDataStream(
                         createRequest,
                         currentState,
-                        rerouteCompletionIsNotRequired()
+                        rerouteCompletionIsNotRequired(),
+                        request.isInitializeFailureStore()
                     );
 
                     final var dataStream = clusterState.metadata().dataStreams().get(request.index());

+ 24 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java

@@ -64,6 +64,8 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
 
     private boolean requireDataStream;
 
+    private boolean initializeFailureStore;
+
     private Settings settings = Settings.EMPTY;
 
     private String mappings = "{}";
@@ -109,6 +111,11 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
         } else {
             requireDataStream = false;
         }
+        if (in.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_LAZY_CREATION)) {
+            initializeFailureStore = in.readBoolean();
+        } else {
+            initializeFailureStore = true;
+        }
     }
 
     public CreateIndexRequest() {
@@ -468,6 +475,19 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
         return this;
     }
 
+    public boolean isInitializeFailureStore() {
+        return initializeFailureStore;
+    }
+
+    /**
+     * Set whether this CreateIndexRequest should initialize the failure store on data stream creation. This can be necessary when, for
+     * example, a failure occurs while trying to ingest a document into a data stream that has to be auto-created.
+     */
+    public CreateIndexRequest initializeFailureStore(boolean initializeFailureStore) {
+        this.initializeFailureStore = initializeFailureStore;
+        return this;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
@@ -491,7 +511,10 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
             out.writeString(origin);
         }
         if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
-            out.writeOptionalBoolean(this.requireDataStream);
+            out.writeBoolean(this.requireDataStream);
+        }
+        if (out.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_LAZY_CREATION)) {
+            out.writeBoolean(this.initializeFailureStore);
         }
     }
 

+ 15 - 18
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

@@ -84,6 +84,7 @@ public class MetadataRolloverService {
         AutoShardingType.COOLDOWN_PREVENTED_DECREASE,
         "es.auto_sharding.cooldown_prevented_decrease.total"
     );
+    private static final String NON_EXISTENT_SOURCE = "_none_";
 
     private final ThreadPool threadPool;
     private final MetadataCreateIndexService createIndexService;
@@ -221,14 +222,13 @@ public class MetadataRolloverService {
 
     private static NameResolution resolveDataStreamRolloverNames(Metadata metadata, DataStream dataStream, boolean isFailureStoreRollover) {
         final DataStream.DataStreamIndices dataStreamIndices = dataStream.getDataStreamIndices(isFailureStoreRollover);
-        assert dataStreamIndices.getWriteIndex() != null : "Unable to roll over dataStreamIndices with no indices";
+        assert dataStreamIndices.getIndices().isEmpty() == false || isFailureStoreRollover
+            : "Unable to roll over dataStreamIndices with no indices";
 
-        final IndexMetadata originalWriteIndex = metadata.index(dataStreamIndices.getWriteIndex());
-        return new NameResolution(
-            originalWriteIndex.getIndex().getName(),
-            null,
-            dataStream.nextWriteIndexAndGeneration(metadata, dataStreamIndices).v1()
-        );
+        final String originalWriteIndex = dataStreamIndices.getIndices().isEmpty() && dataStreamIndices.isRolloverOnWrite()
+            ? NON_EXISTENT_SOURCE
+            : metadata.index(dataStreamIndices.getWriteIndex()).getIndex().getName();
+        return new NameResolution(originalWriteIndex, null, dataStream.nextWriteIndexAndGeneration(metadata, dataStreamIndices).v1());
     }
 
     private RolloverResult rolloverAlias(
@@ -323,13 +323,14 @@ public class MetadataRolloverService {
         }
 
         final DataStream.DataStreamIndices dataStreamIndices = dataStream.getDataStreamIndices(isFailureStoreRollover);
-        final Index originalWriteIndex = dataStreamIndices.getWriteIndex();
+        final boolean isLazyCreation = dataStreamIndices.getIndices().isEmpty() && dataStreamIndices.isRolloverOnWrite();
+        final Index originalWriteIndex = isLazyCreation ? null : dataStreamIndices.getWriteIndex();
         final Tuple<String, Long> nextIndexAndGeneration = dataStream.nextWriteIndexAndGeneration(metadata, dataStreamIndices);
         final String newWriteIndexName = nextIndexAndGeneration.v1();
         final long newGeneration = nextIndexAndGeneration.v2();
         MetadataCreateIndexService.validateIndexName(newWriteIndexName, currentState); // fails if the index already exists
         if (onlyValidate) {
-            return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), currentState);
+            return new RolloverResult(newWriteIndexName, isLazyCreation ? NON_EXISTENT_SOURCE : originalWriteIndex.getName(), currentState);
         }
 
         ClusterState newState;
@@ -423,10 +424,12 @@ public class MetadataRolloverService {
 
         RolloverInfo rolloverInfo = new RolloverInfo(dataStreamName, metConditions, threadPool.absoluteTimeInMillis());
 
-        Metadata.Builder metadataBuilder = Metadata.builder(newState.metadata())
-            .put(
+        Metadata.Builder metadataBuilder = Metadata.builder(newState.metadata());
+        if (isLazyCreation == false) {
+            metadataBuilder.put(
                 IndexMetadata.builder(newState.metadata().index(originalWriteIndex)).stats(sourceIndexStats).putRolloverInfo(rolloverInfo)
             );
+        }
 
         metadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(dataStreamName, metadataBuilder);
         metadataBuilder = withShardSizeForecastForWriteIndex(dataStreamName, metadataBuilder);
@@ -434,7 +437,7 @@ public class MetadataRolloverService {
         newState = ClusterState.builder(newState).metadata(metadataBuilder).build();
         newState = MetadataDataStreamsService.setRolloverOnWrite(newState, dataStreamName, false, isFailureStoreRollover);
 
-        return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), newState);
+        return new RolloverResult(newWriteIndexName, isLazyCreation ? NON_EXISTENT_SOURCE : originalWriteIndex.getName(), newState);
     }
 
     /**
@@ -664,12 +667,6 @@ public class MetadataRolloverService {
                     "aliases, mappings, and index settings may not be specified when rolling over a data stream"
                 );
             }
-            var dataStream = (DataStream) indexAbstraction;
-            if (isFailureStoreRollover && dataStream.isFailureStoreEnabled() == false) {
-                throw new IllegalArgumentException(
-                    "unable to roll over failure store because [" + indexAbstraction.getName() + "] does not have the failure store enabled"
-                );
-            }
         }
     }
 }

+ 45 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

@@ -230,6 +230,16 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
             );
             return;
         }
+        if (targetFailureStore && rolloverTargetAbstraction.isDataStreamRelated() == false) {
+            listener.onFailure(new IllegalStateException("Rolling over failure stores is only possible on data streams."));
+            return;
+        }
+
+        // When we're initializing a failure store, we skip the stats request because there is no source index to retrieve stats for.
+        if (targetFailureStore && ((DataStream) rolloverTargetAbstraction).getFailureIndices().getIndices().isEmpty()) {
+            initializeFailureStore(rolloverRequest, listener, trialSourceIndexName, trialRolloverIndexName);
+            return;
+        }
 
         final var statsIndicesOptions = new IndicesOptions(
             IndicesOptions.ConcreteTargetOptions.ALLOW_UNAVAILABLE_TARGETS,
@@ -317,7 +327,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
 
                 // Pre-check the conditions to see whether we should submit a new cluster state task
                 if (rolloverRequest.areConditionsMet(trialConditionResults)) {
-                    String source = "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]";
+                    String source = "rollover_index source [" + trialSourceIndexName + "] to target [" + trialRolloverIndexName + "]";
                     RolloverTask rolloverTask = new RolloverTask(
                         rolloverRequest,
                         statsResponse,
@@ -334,6 +344,40 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
         );
     }
 
+    private void initializeFailureStore(
+        RolloverRequest rolloverRequest,
+        ActionListener<RolloverResponse> listener,
+        String trialSourceIndexName,
+        String trialRolloverIndexName
+    ) {
+        if (rolloverRequest.getConditionValues().isEmpty() == false) {
+            listener.onFailure(
+                new IllegalStateException("Rolling over/initializing an empty failure store is only supported without conditions.")
+            );
+            return;
+        }
+        final RolloverResponse trialRolloverResponse = new RolloverResponse(
+            trialSourceIndexName,
+            trialRolloverIndexName,
+            Map.of(),
+            rolloverRequest.isDryRun(),
+            false,
+            false,
+            false,
+            rolloverRequest.isLazy()
+        );
+
+        // If this is a dry run, return with the results without invoking a cluster state update.
+        if (rolloverRequest.isDryRun()) {
+            listener.onResponse(trialRolloverResponse);
+            return;
+        }
+
+        String source = "initialize_failure_store with index [" + trialRolloverIndexName + "]";
+        RolloverTask rolloverTask = new RolloverTask(rolloverRequest, null, trialRolloverResponse, null, listener);
+        submitRolloverTask(rolloverRequest, source, rolloverTask);
+    }
+
     void submitRolloverTask(RolloverRequest rolloverRequest, String source, RolloverTask rolloverTask) {
         rolloverTaskQueue.submitTask(source, rolloverTask, rolloverRequest.masterNodeTimeout());
     }

+ 31 - 20
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -350,7 +350,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
             return;
         }
 
-        Map<String, Boolean> indicesToAutoCreate = new HashMap<>();
+        Map<String, CreateIndexRequest> indicesToAutoCreate = new HashMap<>();
         Set<String> dataStreamsToBeRolledOver = new HashSet<>();
         Set<String> failureStoresToBeRolledOver = new HashSet<>();
         populateMissingTargets(bulkRequest, indicesToAutoCreate, dataStreamsToBeRolledOver, failureStoresToBeRolledOver);
@@ -373,19 +373,19 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
      * for lazy rollover.
      *
      * @param bulkRequest the bulk request
-     * @param indicesToAutoCreate a map of index names to whether they require a data stream
+     * @param indicesToAutoCreate a map of index names to their creation request that need to be auto-created
      * @param dataStreamsToBeRolledOver a set of data stream names that were marked for lazy rollover and thus need to be rolled over now
      * @param failureStoresToBeRolledOver a set of data stream names whose failure store was marked for lazy rollover and thus need to be
      * rolled over now
      */
     private void populateMissingTargets(
         BulkRequest bulkRequest,
-        Map<String, Boolean> indicesToAutoCreate,
+        Map<String, CreateIndexRequest> indicesToAutoCreate,
         Set<String> dataStreamsToBeRolledOver,
         Set<String> failureStoresToBeRolledOver
     ) {
         ClusterState state = clusterService.state();
-        // A map for memorizing which indices we already exist (or don't).
+        // A map for memorizing which indices exist.
         Map<String, Boolean> indexExistence = new HashMap<>();
         Function<String, Boolean> indexExistenceComputation = (index) -> indexNameExpressionResolver.hasIndexAbstraction(index, state);
         boolean lazyRolloverFeature = featureService.clusterHasFeature(state, LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER);
@@ -399,19 +399,36 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                 && request.versionType() != VersionType.EXTERNAL_GTE) {
                 continue;
             }
+            boolean writeToFailureStore = request instanceof IndexRequest indexRequest && indexRequest.isWriteToFailureStore();
             boolean indexExists = indexExistence.computeIfAbsent(request.index(), indexExistenceComputation);
             if (indexExists == false) {
-                // We should only auto create an index if _none_ of the requests are requiring it to be an alias.
+                // We should only auto-create an index if _none_ of the requests are requiring it to be an alias.
                 if (request.isRequireAlias()) {
-                    // Remember that this a request required this index to be an alias.
+                    // Remember that this request required this index to be an alias.
                     if (indicesThatRequireAlias.add(request.index())) {
                         // If we didn't already know that, we remove the index from the list of indices to create (if present).
                         indicesToAutoCreate.remove(request.index());
                     }
                 } else if (indicesThatRequireAlias.contains(request.index()) == false) {
-                    Boolean requiresDataStream = indicesToAutoCreate.get(request.index());
-                    if (requiresDataStream == null || (requiresDataStream == false && request.isRequireDataStream())) {
-                        indicesToAutoCreate.put(request.index(), request.isRequireDataStream());
+                    CreateIndexRequest createIndexRequest = indicesToAutoCreate.get(request.index());
+                    // Create a new CreateIndexRequest if we didn't already have one.
+                    if (createIndexRequest == null) {
+                        createIndexRequest = new CreateIndexRequest(request.index()).cause("auto(bulk api)")
+                            .masterNodeTimeout(bulkRequest.timeout())
+                            .requireDataStream(request.isRequireDataStream())
+                            // If this IndexRequest is directed towards a failure store, but the data stream doesn't exist, we initialize
+                            // the failure store on data stream creation instead of lazily.
+                            .initializeFailureStore(writeToFailureStore);
+                        indicesToAutoCreate.put(request.index(), createIndexRequest);
+                    } else {
+                        // Track whether one of the index requests in this bulk request requires the target to be a data stream.
+                        if (createIndexRequest.isRequireDataStream() == false && request.isRequireDataStream()) {
+                            createIndexRequest.requireDataStream(true);
+                        }
+                        // Track whether one of the index requests in this bulk request is directed towards a failure store.
+                        if (createIndexRequest.isInitializeFailureStore() == false && writeToFailureStore) {
+                            createIndexRequest.initializeFailureStore(true);
+                        }
                     }
                 }
             }
@@ -419,7 +436,6 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
             if (lazyRolloverFeature) {
                 DataStream dataStream = state.metadata().dataStreams().get(request.index());
                 if (dataStream != null) {
-                    var writeToFailureStore = request instanceof IndexRequest indexRequest && indexRequest.isWriteToFailureStore();
                     if (writeToFailureStore == false && dataStream.getBackingIndices().isRolloverOnWrite()) {
                         dataStreamsToBeRolledOver.add(request.index());
                     } else if (lazyRolloverFailureStoreFeature
@@ -441,7 +457,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         BulkRequest bulkRequest,
         Executor executor,
         ActionListener<BulkResponse> listener,
-        Map<String, Boolean> indicesToAutoCreate,
+        Map<String, CreateIndexRequest> indicesToAutoCreate,
         Set<String> dataStreamsToBeRolledOver,
         Set<String> failureStoresToBeRolledOver,
         long startTime
@@ -468,14 +484,14 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
 
     private void createIndices(
         BulkRequest bulkRequest,
-        Map<String, Boolean> indicesToAutoCreate,
+        Map<String, CreateIndexRequest> indicesToAutoCreate,
         Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
         AtomicArray<BulkItemResponse> responses,
         RefCountingRunnable refs
     ) {
-        for (Map.Entry<String, Boolean> indexEntry : indicesToAutoCreate.entrySet()) {
+        for (Map.Entry<String, CreateIndexRequest> indexEntry : indicesToAutoCreate.entrySet()) {
             final String index = indexEntry.getKey();
-            createIndex(index, indexEntry.getValue(), bulkRequest.timeout(), ActionListener.releaseAfter(new ActionListener<>() {
+            createIndex(indexEntry.getValue(), ActionListener.releaseAfter(new ActionListener<>() {
                 @Override
                 public void onResponse(CreateIndexResponse createIndexResponse) {}
 
@@ -641,12 +657,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         }
     }
 
-    void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
-        CreateIndexRequest createIndexRequest = new CreateIndexRequest();
-        createIndexRequest.index(index);
-        createIndexRequest.requireDataStream(requireDataStream);
-        createIndexRequest.cause("auto(bulk api)");
-        createIndexRequest.masterNodeTimeout(timeout);
+    void createIndex(CreateIndexRequest createIndexRequest, ActionListener<CreateIndexResponse> listener) {
         client.execute(AutoCreateAction.INSTANCE, createIndexRequest, listener);
     }
 

+ 2 - 1
server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java

@@ -10,6 +10,7 @@ package org.elasticsearch.action.bulk;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.ingest.SimulateIndexResponse;
 import org.elasticsearch.action.support.ActionFilters;
@@ -72,7 +73,7 @@ public class TransportSimulateBulkAction extends TransportBulkAction {
         BulkRequest bulkRequest,
         Executor executor,
         ActionListener<BulkResponse> listener,
-        Map<String, Boolean> indicesToAutoCreate,
+        Map<String, CreateIndexRequest> indicesToAutoCreate,
         Set<String> dataStreamsToRollover,
         Set<String> failureStoresToBeRolledOver,
         long startTime

+ 4 - 14
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -581,23 +581,13 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             );
         }
 
-        // TODO: When failure stores are lazily created, this wont necessarily be required anymore. We can remove the failure store write
-        // index as long as we mark the data stream to lazily rollover the failure store with no conditions on its next write
-        if (failureIndices.indices.size() == (failureIndexPosition + 1)) {
-            throw new IllegalArgumentException(
-                String.format(
-                    Locale.ROOT,
-                    "cannot remove backing index [%s] of data stream [%s] because it is the write index of the failure store",
-                    index.getName(),
-                    name
-                )
-            );
-        }
-
+        // If this is the write index, we're marking the failure store for lazy rollover, to make sure a new write index gets created on the
+        // next write. We do this regardless of whether it's the last index in the failure store or not.
+        boolean rolloverOnWrite = failureIndices.indices.size() == (failureIndexPosition + 1);
         List<Index> updatedFailureIndices = new ArrayList<>(failureIndices.indices);
         updatedFailureIndices.remove(index);
         assert updatedFailureIndices.size() == failureIndices.indices.size() - 1;
-        return copy().setFailureIndices(failureIndices.copy().setIndices(updatedFailureIndices).build())
+        return copy().setFailureIndices(failureIndices.copy().setIndices(updatedFailureIndices).setRolloverOnWrite(rolloverOnWrite).build())
             .setGeneration(generation + 1)
             .build();
     }

+ 7 - 9
server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java

@@ -422,7 +422,7 @@ public class IndexNameExpressionResolver {
                 }
             }
         }
-        if (shouldIncludeFailureIndices(context.getOptions(), dataStream)) {
+        if (shouldIncludeFailureIndices(context.getOptions())) {
             // We short-circuit here, if failure indices are not allowed and they can be skipped
             if (context.getOptions().allowFailureIndices() || context.getOptions().ignoreUnavailable() == false) {
                 for (Index index : dataStream.getFailureIndices().getIndices()) {
@@ -441,7 +441,7 @@ public class IndexNameExpressionResolver {
                 concreteIndicesResult.add(writeIndex);
             }
         }
-        if (shouldIncludeFailureIndices(context.getOptions(), dataStream)) {
+        if (shouldIncludeFailureIndices(context.getOptions())) {
             Index failureStoreWriteIndex = dataStream.getFailureStoreWriteIndex();
             if (failureStoreWriteIndex != null && addIndex(failureStoreWriteIndex, null, context)) {
                 if (context.options.allowFailureIndices() == false) {
@@ -456,10 +456,9 @@ public class IndexNameExpressionResolver {
         return DataStream.isFailureStoreFeatureFlagEnabled() == false || indicesOptions.includeRegularIndices();
     }
 
-    private static boolean shouldIncludeFailureIndices(IndicesOptions indicesOptions, DataStream dataStream) {
-        return DataStream.isFailureStoreFeatureFlagEnabled()
-            && indicesOptions.includeFailureIndices()
-            && dataStream.isFailureStoreEnabled();
+    private static boolean shouldIncludeFailureIndices(IndicesOptions indicesOptions) {
+        // We return failure indices regardless of whether the data stream actually has the `failureStoreEnabled` flag set to true.
+        return DataStream.isFailureStoreFeatureFlagEnabled() && indicesOptions.includeFailureIndices();
     }
 
     private static boolean resolvesToMoreThanOneIndex(IndexAbstraction indexAbstraction, Context context) {
@@ -469,7 +468,7 @@ public class IndexNameExpressionResolver {
             if (shouldIncludeRegularIndices(context.getOptions())) {
                 count += dataStream.getIndices().size();
             }
-            if (shouldIncludeFailureIndices(context.getOptions(), dataStream)) {
+            if (shouldIncludeFailureIndices(context.getOptions())) {
                 count += dataStream.getFailureIndices().getIndices().size();
             }
             return count > 1;
@@ -1426,8 +1425,7 @@ public class IndexNameExpressionResolver {
                     if (shouldIncludeRegularIndices(context.getOptions())) {
                         indicesStateStream = indexAbstraction.getIndices().stream().map(context.state.metadata()::index);
                     }
-                    if (indexAbstraction.getType() == Type.DATA_STREAM
-                        && shouldIncludeFailureIndices(context.getOptions(), (DataStream) indexAbstraction)) {
+                    if (indexAbstraction.getType() == Type.DATA_STREAM && shouldIncludeFailureIndices(context.getOptions())) {
                         DataStream dataStream = (DataStream) indexAbstraction;
                         indicesStateStream = Stream.concat(
                             indicesStateStream,

+ 34 - 24
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

@@ -101,14 +101,9 @@ public class MetadataCreateDataStreamService {
             new AckedClusterStateUpdateTask(Priority.HIGH, request, delegate.clusterStateUpdate()) {
                 @Override
                 public ClusterState execute(ClusterState currentState) throws Exception {
-                    ClusterState clusterState = createDataStream(
-                        metadataCreateIndexService,
-                        clusterService.getSettings(),
-                        currentState,
-                        isDslOnlyMode,
-                        request,
-                        delegate.reroute()
-                    );
+                    // When we're manually creating a data stream (i.e. not an auto creation), we don't need to initialize the failure store
+                    // because we don't need to redirect any failures in the same request.
+                    ClusterState clusterState = createDataStream(request, currentState, delegate.reroute(), false);
                     DataStream createdDataStream = clusterState.metadata().dataStreams().get(request.name);
                     firstBackingIndexRef.set(createdDataStream.getIndices().get(0).getName());
                     if (createdDataStream.getFailureIndices().getIndices().isEmpty() == false) {
@@ -128,9 +123,18 @@ public class MetadataCreateDataStreamService {
     public ClusterState createDataStream(
         CreateDataStreamClusterStateUpdateRequest request,
         ClusterState current,
-        ActionListener<Void> rerouteListener
+        ActionListener<Void> rerouteListener,
+        boolean initializeFailureStore
     ) throws Exception {
-        return createDataStream(metadataCreateIndexService, clusterService.getSettings(), current, isDslOnlyMode, request, rerouteListener);
+        return createDataStream(
+            metadataCreateIndexService,
+            clusterService.getSettings(),
+            current,
+            isDslOnlyMode,
+            request,
+            rerouteListener,
+            initializeFailureStore
+        );
     }
 
     public static final class CreateDataStreamClusterStateUpdateRequest extends ClusterStateUpdateRequest<
@@ -194,7 +198,8 @@ public class MetadataCreateDataStreamService {
         ClusterState currentState,
         boolean isDslOnlyMode,
         CreateDataStreamClusterStateUpdateRequest request,
-        ActionListener<Void> rerouteListener
+        ActionListener<Void> rerouteListener,
+        boolean initializeFailureStore
     ) throws Exception {
         return createDataStream(
             metadataCreateIndexService,
@@ -204,7 +209,8 @@ public class MetadataCreateDataStreamService {
             request,
             List.of(),
             null,
-            rerouteListener
+            rerouteListener,
+            initializeFailureStore
         );
     }
 
@@ -212,11 +218,12 @@ public class MetadataCreateDataStreamService {
      * Creates a data stream with the specified request, backing indices and write index.
      *
      * @param metadataCreateIndexService Used if a new write index must be created
-     * @param currentState               Cluster state
-     * @param request                    The create data stream request
-     * @param backingIndices             List of backing indices. May be empty
-     * @param writeIndex                 Write index for the data stream. If null, a new write index will be created.
-     * @return                           Cluster state containing the new data stream
+     * @param currentState Cluster state
+     * @param request The create data stream request
+     * @param backingIndices List of backing indices. May be empty
+     * @param writeIndex Write index for the data stream. If null, a new write index will be created.
+     * @param initializeFailureStore Whether the failure store should be initialized
+     * @return Cluster state containing the new data stream
      */
     static ClusterState createDataStream(
         MetadataCreateIndexService metadataCreateIndexService,
@@ -226,7 +233,8 @@ public class MetadataCreateDataStreamService {
         CreateDataStreamClusterStateUpdateRequest request,
         List<IndexMetadata> backingIndices,
         IndexMetadata writeIndex,
-        ActionListener<Void> rerouteListener
+        ActionListener<Void> rerouteListener,
+        boolean initializeFailureStore
     ) throws Exception {
         String dataStreamName = request.name;
         SystemDataStreamDescriptor systemDataStreamDescriptor = request.getSystemDataStreamDescriptor();
@@ -274,7 +282,7 @@ public class MetadataCreateDataStreamService {
         // If we need to create a failure store, do so first. Do not reroute during the creation since we will do
         // that as part of creating the backing index if required.
         IndexMetadata failureStoreIndex = null;
-        if (template.getDataStreamTemplate().hasFailureStore()) {
+        if (template.getDataStreamTemplate().hasFailureStore() && initializeFailureStore) {
             if (isSystem) {
                 throw new IllegalArgumentException("Failure stores are not supported on system data streams");
             }
@@ -312,7 +320,8 @@ public class MetadataCreateDataStreamService {
         }
         assert writeIndex != null;
         assert writeIndex.mapping() != null : "no mapping found for backing index [" + writeIndex.getIndex().getName() + "]";
-        assert template.getDataStreamTemplate().hasFailureStore() == false || failureStoreIndex != null;
+        assert template.getDataStreamTemplate().hasFailureStore() == false || initializeFailureStore == false || failureStoreIndex != null
+            : "failure store should have an initial index";
         assert failureStoreIndex == null || failureStoreIndex.mapping() != null
             : "no mapping found for failure store [" + failureStoreIndex.getIndex().getName() + "]";
 
@@ -328,19 +337,20 @@ public class MetadataCreateDataStreamService {
         List<Index> failureIndices = failureStoreIndex == null ? List.of() : List.of(failureStoreIndex.getIndex());
         DataStream newDataStream = new DataStream(
             dataStreamName,
-            dsBackingIndices,
             initialGeneration,
             template.metadata() != null ? Map.copyOf(template.metadata()) : null,
             hidden,
             false,
             isSystem,
+            System::currentTimeMillis,
             template.getDataStreamTemplate().isAllowCustomRouting(),
             indexMode,
             lifecycle == null && isDslOnlyMode ? DataStreamLifecycle.DEFAULT : lifecycle,
             template.getDataStreamTemplate().hasFailureStore(),
-            failureIndices,
-            false,
-            null
+            new DataStream.DataStreamIndices(DataStream.BACKING_INDEX_PREFIX, dsBackingIndices, false, null),
+            // If the failure store shouldn't be initialized on data stream creation, we're marking it for "lazy rollover", which will
+            // initialize the failure store on first write.
+            new DataStream.DataStreamIndices(DataStream.FAILURE_STORE_PREFIX, failureIndices, initializeFailureStore == false, null)
         );
         Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
 

+ 3 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java

@@ -165,7 +165,9 @@ public class MetadataMigrateToDataStreamService {
             req,
             backingIndices,
             currentState.metadata().index(writeIndex),
-            listener
+            listener,
+            // No need to initialize the failure store when migrating to a data stream.
+            false
         );
     }
 

+ 0 - 11
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java

@@ -268,17 +268,6 @@ public class MetadataRolloverServiceTests extends ESTestCase {
             exception.getMessage(),
             equalTo("aliases, mappings, and index settings may not be specified when rolling over a data stream")
         );
-
-        exception = expectThrows(
-            IllegalArgumentException.class,
-            () -> MetadataRolloverService.validate(metadata, randomDataStream.getName(), null, req, true)
-        );
-        assertThat(
-            exception.getMessage(),
-            equalTo(
-                "unable to roll over failure store because [" + randomDataStream.getName() + "] does not have the failure store enabled"
-            )
-        );
     }
 
     public void testGenerateRolloverIndexName() {

+ 3 - 2
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.action.bulk;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
@@ -25,7 +26,6 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexingPressure;
@@ -146,7 +146,8 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
             }
 
             @Override
-            void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
+            void createIndex(CreateIndexRequest createIndexRequest, ActionListener<CreateIndexResponse> listener) {
+                String index = createIndexRequest.index();
                 try {
                     simulateAutoCreate.accept(index);
                     // If we try to create an index just immediately assume it worked

+ 2 - 2
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.action.bulk;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
@@ -36,7 +37,6 @@ import org.elasticsearch.common.TriConsumer;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.core.Nullable;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexSettings;
@@ -174,7 +174,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         }
 
         @Override
-        void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
+        void createIndex(CreateIndexRequest createIndexRequest, ActionListener<CreateIndexResponse> listener) {
             indexCreated = true;
             listener.onResponse(null);
         }

+ 2 - 2
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver;
 import org.elasticsearch.action.delete.DeleteRequest;
@@ -34,7 +35,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexVersion;
@@ -105,7 +105,7 @@ public class TransportBulkActionTests extends ESTestCase {
         }
 
         @Override
-        void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
+        void createIndex(CreateIndexRequest createIndexRequest, ActionListener<CreateIndexResponse> listener) {
             indexCreated = true;
             if (beforeIndexCreation != null) {
                 beforeIndexCreation.run();

+ 3 - 3
server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java

@@ -11,6 +11,7 @@ package org.elasticsearch.action.bulk;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.ingest.SimulateIndexResponse;
@@ -22,7 +23,6 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentHelper;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexVersions;
 import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.indices.EmptySystemIndices;
@@ -83,7 +83,7 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
         }
 
         @Override
-        void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
+        void createIndex(CreateIndexRequest createIndexRequest, ActionListener<CreateIndexResponse> listener) {
             indexCreated = true;
             if (beforeIndexCreation != null) {
                 beforeIndexCreation.run();
@@ -192,7 +192,7 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
                 fail(e, "Unexpected error");
             }
         };
-        Map<String, Boolean> indicesToAutoCreate = Map.of(); // unused
+        Map<String, CreateIndexRequest> indicesToAutoCreate = Map.of(); // unused
         Set<String> dataStreamsToRollover = Set.of(); // unused
         Set<String> failureStoresToRollover = Set.of(); // unused
         long startTime = 0;

+ 10 - 17
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -353,24 +353,17 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
 
     public void testRemoveFailureStoreWriteIndex() {
         DataStream original = createRandomDataStream();
+        int indexToRemove = original.getFailureIndices().getIndices().size() - 1;
 
-        IllegalArgumentException e = expectThrows(
-            IllegalArgumentException.class,
-            () -> original.removeFailureStoreIndex(
-                original.getFailureIndices().getIndices().get(original.getFailureIndices().getIndices().size() - 1)
-            )
-        );
-        assertThat(
-            e.getMessage(),
-            equalTo(
-                String.format(
-                    Locale.ROOT,
-                    "cannot remove backing index [%s] of data stream [%s] because it is the write index of the failure store",
-                    original.getFailureIndices().getIndices().get(original.getFailureIndices().getIndices().size() - 1).getName(),
-                    original.getName()
-                )
-            )
-        );
+        DataStream updated = original.removeFailureStoreIndex(original.getFailureIndices().getIndices().get(indexToRemove));
+        assertThat(updated.getName(), equalTo(original.getName()));
+        assertThat(updated.getGeneration(), equalTo(original.getGeneration() + 1));
+        assertThat(updated.getIndices().size(), equalTo(original.getIndices().size()));
+        assertThat(updated.getFailureIndices().getIndices().size(), equalTo(original.getFailureIndices().getIndices().size() - 1));
+        assertThat(updated.getFailureIndices().isRolloverOnWrite(), equalTo(true));
+        for (int k = 0; k < (original.getFailureIndices().getIndices().size() - 1); k++) {
+            assertThat(updated.getFailureIndices().getIndices().get(k), equalTo(original.getFailureIndices().getIndices().get(k)));
+        }
     }
 
     public void testAddBackingIndex() {

+ 62 - 14
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java

@@ -38,9 +38,11 @@ import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMa
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance;
 import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.mock;
@@ -65,7 +67,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
             cs,
             true,
             req,
-            ActionListener.noop()
+            ActionListener.noop(),
+            false
         );
         assertThat(newState.metadata().dataStreams().size(), equalTo(1));
         assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
@@ -105,7 +108,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
             cs,
             randomBoolean(),
             req,
-            ActionListener.noop()
+            ActionListener.noop(),
+            false
         );
         assertThat(newState.metadata().dataStreams().size(), equalTo(1));
         assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
@@ -182,7 +186,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
             cs,
             randomBoolean(),
             req,
-            ActionListener.noop()
+            ActionListener.noop(),
+            false
         );
         assertThat(newState.metadata().dataStreams().size(), equalTo(1));
         assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
@@ -219,7 +224,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
         return builder.build();
     }
 
-    public void testCreateDataStreamWithFailureStore() throws Exception {
+    public void testCreateDataStreamWithFailureStoreInitialized() throws Exception {
         final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
         final String dataStreamName = "my-data-stream";
         ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStreamName + "*"))
@@ -235,7 +240,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
             cs,
             randomBoolean(),
             req,
-            ActionListener.noop()
+            ActionListener.noop(),
+            true
         );
         var backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1, req.getStartTime());
         var failureStoreIndexName = DataStream.getDefaultFailureStoreName(dataStreamName, 1, req.getStartTime());
@@ -252,6 +258,39 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
         assertThat(newState.metadata().index(failureStoreIndexName).isSystem(), is(false));
     }
 
+    public void testCreateDataStreamWithFailureStoreUninitialized() throws Exception {
+        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
+        final String dataStreamName = "my-data-stream";
+        ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStreamName + "*"))
+            .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true))
+            .build();
+        ClusterState cs = ClusterState.builder(new ClusterName("_name"))
+            .metadata(Metadata.builder().put("template", template).build())
+            .build();
+        CreateDataStreamClusterStateUpdateRequest req = new CreateDataStreamClusterStateUpdateRequest(dataStreamName);
+        ClusterState newState = MetadataCreateDataStreamService.createDataStream(
+            metadataCreateIndexService,
+            Settings.EMPTY,
+            cs,
+            randomBoolean(),
+            req,
+            ActionListener.noop(),
+            false
+        );
+        var backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1, req.getStartTime());
+        var failureStoreIndexName = DataStream.getDefaultFailureStoreName(dataStreamName, 1, req.getStartTime());
+        assertThat(newState.metadata().dataStreams().size(), equalTo(1));
+        assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
+        assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(false));
+        assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false));
+        assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false));
+        assertThat(newState.metadata().dataStreams().get(dataStreamName).getFailureIndices().getIndices(), empty());
+        assertThat(newState.metadata().index(backingIndexName), notNullValue());
+        assertThat(newState.metadata().index(backingIndexName).getSettings().get("index.hidden"), equalTo("true"));
+        assertThat(newState.metadata().index(backingIndexName).isSystem(), is(false));
+        assertThat(newState.metadata().index(failureStoreIndexName), nullValue());
+    }
+
     public void testCreateDataStreamWithFailureStoreWithRefreshRate() throws Exception {
         final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
         var timeValue = randomTimeValue();
@@ -272,7 +311,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
             cs,
             randomBoolean(),
             req,
-            ActionListener.noop()
+            ActionListener.noop(),
+            true
         );
         var backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1, req.getStartTime());
         var failureStoreIndexName = DataStream.getDefaultFailureStoreName(dataStreamName, 1, req.getStartTime());
@@ -303,7 +343,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
             cs,
             randomBoolean(),
             req,
-            ActionListener.noop()
+            ActionListener.noop(),
+            false
         );
         assertThat(newState.metadata().dataStreams().size(), equalTo(1));
         assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
@@ -336,7 +377,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
                 cs,
                 randomBoolean(),
                 req,
-                ActionListener.noop()
+                ActionListener.noop(),
+                false
             )
         );
         assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists"));
@@ -355,7 +397,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
                 cs,
                 randomBoolean(),
                 req,
-                ActionListener.noop()
+                ActionListener.noop(),
+                false
             )
         );
         assertThat(e.getMessage(), containsString("must not contain the following characters"));
@@ -374,7 +417,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
                 cs,
                 randomBoolean(),
                 req,
-                ActionListener.noop()
+                ActionListener.noop(),
+                false
             )
         );
         assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must be lowercase"));
@@ -393,7 +437,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
                 cs,
                 randomBoolean(),
                 req,
-                ActionListener.noop()
+                ActionListener.noop(),
+                false
             )
         );
         assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must not start with '.ds-'"));
@@ -412,7 +457,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
                 cs,
                 randomBoolean(),
                 req,
-                ActionListener.noop()
+                ActionListener.noop(),
+                false
             )
         );
         assertThat(e.getMessage(), equalTo("no matching index template found for data stream [my-data-stream]"));
@@ -434,7 +480,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
                 cs,
                 randomBoolean(),
                 req,
-                ActionListener.noop()
+                ActionListener.noop(),
+                false
             )
         );
         assertThat(
@@ -459,7 +506,8 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
             cs,
             randomBoolean(),
             req,
-            ActionListener.noop()
+            ActionListener.noop(),
+            false
         );
     }
 

+ 3 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

@@ -334,6 +334,9 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction<Pu
                     // (and potentially even break things).
                     remoteDataStream.getBackingIndices().copy().setIndices(List.of(backingIndexToFollow)).setRolloverOnWrite(false).build()
                 )
+                // Replicated data streams should not have the failure store marked for lazy rollover (which they do by default for lazy
+                // failure store creation).
+                .setFailureIndices(remoteDataStream.getFailureIndices().copy().setRolloverOnWrite(false).build())
                 .setReplicated(true)
                 .build();
         } else {

+ 3 - 0
x-pack/plugin/core/src/javaRestTest/java/org/elasticsearch/xpack/core/DataStreamRestIT.java

@@ -74,6 +74,9 @@ public class DataStreamRestIT extends ESRestTestCase {
         indexRequest = new Request("POST", "/fs/_doc");
         indexRequest.setJsonEntity("{\"@timestamp\": \"2020-01-01\"}");
         client().performRequest(indexRequest);
+        // Initialize the failure store
+        rollover = new Request("POST", "/fs/_rollover?target_failure_store=true");
+        client().performRequest(rollover);
 
         dataStreams = (Map<?, ?>) getLocation("/_xpack/usage").get("data_streams");
         assertNotNull(dataStreams);