Browse Source

Extract `DataStreamIndices` from `DataStream` class (#107562)

We were seeing more and more common fields between "regular" backing indices and failure store indices (i.e. `indices`, `rolloverOnWrite`, `autoShardingEvent`). To avoid having to duplicate these fields (and possibly any future fields), we extract a class that contains these fields.
Niels Bauman 1 year ago
parent
commit
920290a37b
39 changed files with 699 additions and 474 deletions
  1. 1 1
      docs/reference/data-streams/change-mappings-and-settings.asciidoc
  2. 1 1
      docs/reference/data-streams/downsampling-manual.asciidoc
  3. 4 4
      docs/reference/data-streams/lifecycle/tutorial-migrate-data-stream-from-ilm-to-dsl.asciidoc
  4. 1 1
      docs/reference/indices/get-data-stream.asciidoc
  5. 8 1
      modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java
  6. 6 8
      modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java
  7. 1 1
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java
  8. 1 1
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DeleteDataStreamTransportAction.java
  9. 2 2
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/GetDataStreamsTransportAction.java
  10. 3 1
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java
  11. 6 8
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java
  12. 1 1
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionServiceTests.java
  13. 15 15
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml
  14. 12 12
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml
  15. 8 8
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml
  16. 10 10
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_rollover_failure_store.yml
  17. 5 5
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml
  18. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  19. 2 2
      server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
  20. 14 20
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java
  21. 3 1
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java
  22. 50 53
      server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java
  23. 2 2
      server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
  24. 343 208
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
  25. 2 2
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java
  26. 3 3
      server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java
  27. 5 2
      server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
  28. 2 2
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java
  29. 28 6
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java
  30. 4 1
      server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
  31. 10 5
      server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java
  32. 4 3
      server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java
  33. 4 4
      server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java
  34. 87 55
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java
  35. 6 1
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java
  36. 28 17
      test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java
  37. 4 1
      x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java
  38. 6 5
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java
  39. 6 1
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamSecurityIT.java

+ 1 - 1
docs/reference/data-streams/change-mappings-and-settings.asciidoc

@@ -602,7 +602,7 @@ stream's oldest backing index.
 // TESTRESPONSE[s/"index_uuid": "_eEfRrFHS9OyhqWntkgHAQ"/"index_uuid": $body.data_streams.0.indices.1.index_uuid/]
 // TESTRESPONSE[s/"index_name": ".ds-my-data-stream-2099.03.07-000001"/"index_name": $body.data_streams.0.indices.0.index_name/]
 // TESTRESPONSE[s/"index_name": ".ds-my-data-stream-2099.03.08-000002"/"index_name": $body.data_streams.0.indices.1.index_name/]
-// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_indices":[],"failure_store":false/]
+// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
 
 <1> First item in the `indices` array for `my-data-stream`. This item contains
 information about the stream's oldest backing index,

+ 1 - 1
docs/reference/data-streams/downsampling-manual.asciidoc

@@ -389,7 +389,7 @@ This returns:
 // TESTRESPONSE[s/"ltOJGmqgTVm4T-Buoe7Acg"/$body.data_streams.0.indices.0.index_uuid/]
 // TESTRESPONSE[s/"2023-07-26T09:26:42.000Z"/$body.data_streams.0.time_series.temporal_ranges.0.start/]
 // TESTRESPONSE[s/"2023-07-26T13:26:42.000Z"/$body.data_streams.0.time_series.temporal_ranges.0.end/]
-// TESTRESPONSE[s/"replicated": false/"replicated": false,"failure_indices":[],"failure_store":false/]
+// TESTRESPONSE[s/"replicated": false/"replicated": false,"failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
 <1> The backing index for this data stream.
 
 Before a backing index can be downsampled, the TSDS needs to be rolled over and

+ 4 - 4
docs/reference/data-streams/lifecycle/tutorial-migrate-data-stream-from-ilm-to-dsl.asciidoc

@@ -147,7 +147,7 @@ and that the next generation index will also be managed by {ilm-init}:
 // TESTRESPONSE[s/"index_uuid": "xCEhwsp8Tey0-FLNFYVwSg"/"index_uuid": $body.data_streams.0.indices.0.index_uuid/]
 // TESTRESPONSE[s/"index_name": ".ds-dsl-data-stream-2023.10.19-000002"/"index_name": $body.data_streams.0.indices.1.index_name/]
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8DJ5gw"/"index_uuid": $body.data_streams.0.indices.1.index_uuid/]
-// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_indices":[],"failure_store":false/]
+// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
 
 <1> The name of the backing index.
 <2> For each backing index we display the value of the <<index-lifecycle-prefer-ilm, prefer_ilm>>
@@ -284,7 +284,7 @@ GET _data_stream/dsl-data-stream
 // TESTRESPONSE[s/"index_uuid": "xCEhwsp8Tey0-FLNFYVwSg"/"index_uuid": $body.data_streams.0.indices.0.index_uuid/]
 // TESTRESPONSE[s/"index_name": ".ds-dsl-data-stream-2023.10.19-000002"/"index_name": $body.data_streams.0.indices.1.index_name/]
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8DJ5gw"/"index_uuid": $body.data_streams.0.indices.1.index_uuid/]
-// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_indices":[],"failure_store":false/]
+// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
 
 <1> The existing backing index will continue to be managed by {ilm-init}
 <2> The existing backing index will continue to be managed by {ilm-init}
@@ -364,7 +364,7 @@ GET _data_stream/dsl-data-stream
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8DJ5gw"/"index_uuid": $body.data_streams.0.indices.1.index_uuid/]
 // TESTRESPONSE[s/"index_name": ".ds-dsl-data-stream-2023.10.19-000003"/"index_name": $body.data_streams.0.indices.2.index_name/]
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8abcd1"/"index_uuid": $body.data_streams.0.indices.2.index_uuid/]
-// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_indices":[],"failure_store":false/]
+// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
 
 <1> The backing indices that existed before rollover will continue to be managed by {ilm-init}
 <2> The backing indices that existed before rollover will continue to be managed by {ilm-init}
@@ -462,7 +462,7 @@ GET _data_stream/dsl-data-stream
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8DJ5gw"/"index_uuid": $body.data_streams.0.indices.1.index_uuid/]
 // TESTRESPONSE[s/"index_name": ".ds-dsl-data-stream-2023.10.19-000003"/"index_name": $body.data_streams.0.indices.2.index_name/]
 // TESTRESPONSE[s/"index_uuid": "PA_JquKGSiKcAKBA8abcd1"/"index_uuid": $body.data_streams.0.indices.2.index_uuid/]
-// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_indices":[],"failure_store":false/]
+// TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW","failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]
 <1> The write index is now managed by {ilm-init}
 <2> The `lifecycle` configured on the data stream is now disabled.
 <3> The next write index will be managed by {ilm-init}

+ 1 - 1
docs/reference/indices/get-data-stream.asciidoc

@@ -358,4 +358,4 @@ The API returns the following response:
 // TESTRESPONSE[s/"index_name": ".ds-my-data-stream-two-2099.03.08-000001"/"index_name": $body.data_streams.1.indices.0.index_name/]
 // TESTRESPONSE[s/"index_uuid": "3liBu2SYS5axasRt6fUIpA"/"index_uuid": $body.data_streams.1.indices.0.index_uuid/]
 // TESTRESPONSE[s/"status": "GREEN"/"status": "YELLOW"/]
-// TESTRESPONSE[s/"replicated": false/"replicated": false,"failure_indices":[],"failure_store":false/]
+// TESTRESPONSE[s/"replicated": false/"replicated": false,"failure_store":{"enabled": false, "indices": [], "rollover_on_write": false}/]

+ 8 - 1
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

@@ -1779,7 +1779,14 @@ public class DataStreamIT extends ESIntegTestCase {
                 public ClusterState execute(ClusterState currentState) throws Exception {
                     DataStream original = currentState.getMetadata().dataStreams().get(dataStreamName);
                     DataStream broken = original.copy()
-                        .setIndices(List.of(new Index(original.getIndices().get(0).getName(), "broken"), original.getIndices().get(1)))
+                        .setBackingIndices(
+                            original.getBackingIndices()
+                                .copy()
+                                .setIndices(
+                                    List.of(new Index(original.getIndices().get(0).getName(), "broken"), original.getIndices().get(1))
+                                )
+                                .build()
+                        )
                         .build();
                     brokenDataStreamHolder.set(broken);
                     return ClusterState.builder(currentState)

+ 6 - 8
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java

@@ -58,7 +58,7 @@ public class FailureStoreQueryParamIT extends DisabledSecurityDataStreamTestCase
         assertThat(dataStreams.size(), is(1));
         Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
         assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME));
-        List<String> backingIndices = getBackingIndices(dataStream);
+        List<String> backingIndices = getIndices(dataStream);
         assertThat(backingIndices.size(), is(1));
         List<String> failureStore = getFailureStore(dataStream);
         assertThat(failureStore.size(), is(1));
@@ -199,18 +199,16 @@ public class FailureStoreQueryParamIT extends DisabledSecurityDataStreamTestCase
         }
     }
 
-    private List<String> getBackingIndices(Map<String, Object> response) {
-        return getIndices(response, "indices");
-    }
-
+    @SuppressWarnings("unchecked")
     private List<String> getFailureStore(Map<String, Object> response) {
-        return getIndices(response, "failure_indices");
+        var failureStore = (Map<String, Object>) response.get("failure_store");
+        return getIndices(failureStore);
 
     }
 
     @SuppressWarnings("unchecked")
-    private List<String> getIndices(Map<String, Object> response, String fieldName) {
-        List<Map<String, String>> indices = (List<Map<String, String>>) response.get(fieldName);
+    private List<String> getIndices(Map<String, Object> response) {
+        List<Map<String, String>> indices = (List<Map<String, String>>) response.get("indices");
         return indices.stream().map(index -> index.get("index_name")).toList();
     }
 }

+ 1 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java

@@ -37,7 +37,7 @@ public class DataStreamFeatures implements FeatureSpecification {
             DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE,  // Added in 8.12
             LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER,                    // Added in 8.13
             DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
-            DataStreamGlobalRetention.GLOBAL_RETENTION                      // Added in 8.14
+            DataStreamGlobalRetention.GLOBAL_RETENTION                       // Added in 8.14
         );
     }
 }

+ 1 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DeleteDataStreamTransportAction.java

@@ -155,7 +155,7 @@ public class DeleteDataStreamTransportAction extends AcknowledgedTransportMaster
             DataStream dataStream = currentState.metadata().dataStreams().get(dataStreamName);
             assert dataStream != null;
             backingIndicesToRemove.addAll(dataStream.getIndices());
-            backingIndicesToRemove.addAll(dataStream.getFailureIndices());
+            backingIndicesToRemove.addAll(dataStream.getFailureIndices().getIndices());
         }
 
         // first delete the data streams and then the indices:

+ 2 - 2
modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/GetDataStreamsTransportAction.java

@@ -145,8 +145,8 @@ public class GetDataStreamsTransportAction extends TransportMasterNodeReadAction
             Map<Index, IndexProperties> backingIndicesSettingsValues = new HashMap<>();
             Metadata metadata = state.getMetadata();
             collectIndexSettingsValues(dataStream, backingIndicesSettingsValues, metadata, dataStream.getIndices());
-            if (DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.getFailureIndices().isEmpty() == false) {
-                collectIndexSettingsValues(dataStream, backingIndicesSettingsValues, metadata, dataStream.getFailureIndices());
+            if (DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.getFailureIndices().getIndices().isEmpty() == false) {
+                collectIndexSettingsValues(dataStream, backingIndicesSettingsValues, metadata, dataStream.getFailureIndices().getIndices());
             }
 
             GetDataStreamAction.Response.TimeSeries timeSeries = null;

+ 3 - 1
modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

@@ -139,7 +139,9 @@ public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
             List.of(new Tuple<>(start.minus(4, ChronoUnit.HOURS), start), new Tuple<>(start, end))
         ).getMetadata();
         DataStream d = metadata.dataStreams().get(dataStreamName);
-        metadata = Metadata.builder(metadata).put(d.copy().setReplicated(true).setRolloverOnWrite(false).build()).build();
+        metadata = Metadata.builder(metadata)
+            .put(d.copy().setReplicated(true).setBackingIndices(d.getBackingIndices().copy().setRolloverOnWrite(false).build()).build())
+            .build();
 
         now = now.plus(1, ChronoUnit.HOURS);
         ClusterState in = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();

+ 6 - 8
modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java

@@ -82,7 +82,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
                 .setIndexMode(IndexMode.STANDARD)
                 .setLifecycle(new DataStreamLifecycle())
                 .setFailureStoreEnabled(true)
-                .setFailureIndices(failureStores)
+                .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
                 .build();
 
             String ilmPolicyName = "rollover-30days";
@@ -159,9 +159,8 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
                 );
 
                 if (DataStream.isFailureStoreFeatureFlagEnabled()) {
-                    List<Object> failureStoresRepresentation = (List<Object>) dataStreamMap.get(
-                        DataStream.FAILURE_INDICES_FIELD.getPreferredName()
-                    );
+                    var failureStore = (Map<String, Object>) dataStreamMap.get(DataStream.FAILURE_STORE_FIELD.getPreferredName());
+                    List<Object> failureStoresRepresentation = (List<Object>) failureStore.get(DataStream.INDICES_FIELD.getPreferredName());
                     Map<String, Object> failureStoreRepresentation = (Map<String, Object>) failureStoresRepresentation.get(0);
                     assertThat(failureStoreRepresentation.get("index_name"), is(failureStoreIndex.getName()));
                     assertThat(failureStoreRepresentation.get(Response.DataStreamInfo.PREFER_ILM.getPreferredName()), is(false));
@@ -185,7 +184,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
                 .setIndexMode(IndexMode.STANDARD)
                 .setLifecycle(new DataStreamLifecycle(null, null, false))
                 .setFailureStoreEnabled(true)
-                .setFailureIndices(failureStores)
+                .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
                 .build();
 
             String ilmPolicyName = "rollover-30days";
@@ -251,9 +250,8 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
                 );
 
                 if (DataStream.isFailureStoreFeatureFlagEnabled()) {
-                    List<Object> failureStoresRepresentation = (List<Object>) dataStreamMap.get(
-                        DataStream.FAILURE_INDICES_FIELD.getPreferredName()
-                    );
+                    var failureStore = (Map<String, Object>) dataStreamMap.get(DataStream.FAILURE_STORE_FIELD.getPreferredName());
+                    List<Object> failureStoresRepresentation = (List<Object>) failureStore.get(DataStream.INDICES_FIELD.getPreferredName());
                     Map<String, Object> failureStoreRepresentation = (Map<String, Object>) failureStoresRepresentation.get(0);
                     assertThat(failureStoreRepresentation.get("index_name"), is(failureStoreIndex.getName()));
                     assertThat(failureStoreRepresentation.get(Response.DataStreamInfo.PREFER_ILM.getPreferredName()), is(false));

+ 1 - 1
modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionServiceTests.java

@@ -261,7 +261,7 @@ public class UpdateDataStreamGlobalRetentionServiceTests extends ESTestCase {
             .setReplicated(replicated)
             .setLifecycle(lifecycle)
             .setFailureStoreEnabled(failureStores.isEmpty() == false)
-            .setFailureIndices(failureStores);
+            .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build());
         if (randomBoolean()) {
             builder.setSystem(true);
             builder.setHidden(true);

+ 15 - 15
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml

@@ -210,8 +210,8 @@ setup:
 ---
 "Create data stream with failure store":
   - requires:
-      cluster_features: ["gte_v8.11.0"]
-      reason: "data stream failure stores only creatable in 8.11+"
+      cluster_features: ["gte_v8.15.0"]
+      reason: "data stream failure stores REST structure changed in 8.15+"
 
   - do:
       allowed_warnings:
@@ -248,9 +248,9 @@ setup:
   - match: { data_streams.0.status: 'GREEN' }
   - match: { data_streams.0.template: 'my-template4' }
   - match: { data_streams.0.hidden: false }
-  - match: { data_streams.0.failure_store: true }
-  - length: { data_streams.0.failure_indices: 1 }
-  - match: { data_streams.0.failure_indices.0.index_name: '/\.fs-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/'}
+  - match: { data_streams.0.failure_store.enabled: true }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/'}
 
   - match: { data_streams.1.name: failure-data-stream2 }
   - match: { data_streams.1.timestamp_field.name: '@timestamp' }
@@ -259,15 +259,15 @@ setup:
   - match: { data_streams.1.indices.0.index_name: '/\.ds-failure-data-stream2-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - match: { data_streams.1.template: 'my-template4' }
   - match: { data_streams.1.hidden: false }
-  - match: { data_streams.1.failure_store: true }
-  - length: { data_streams.1.failure_indices: 1 }
-  - match: { data_streams.1.failure_indices.0.index_name: '/\.fs-failure-data-stream2-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.1.failure_store.enabled: true }
+  - length: { data_streams.1.failure_store.indices: 1 }
+  - match: { data_streams.1.failure_store.indices.0.index_name: '/\.fs-failure-data-stream2-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
 
   # save the backing index names for later use
   - set: { data_streams.0.indices.0.index_name: idx0name }
-  - set: { data_streams.0.failure_indices.0.index_name: fsidx0name }
+  - set: { data_streams.0.failure_store.indices.0.index_name: fsidx0name }
   - set: { data_streams.1.indices.0.index_name: idx1name }
-  - set: { data_streams.1.failure_indices.0.index_name: fsidx1name }
+  - set: { data_streams.1.failure_store.indices.0.index_name: fsidx1name }
 
   - do:
       indices.get_mapping:
@@ -538,8 +538,8 @@ setup:
 ---
 "Delete data stream with failure stores":
   - requires:
-      cluster_features: ["gte_v8.12.0"]
-      reason: "data stream failure stores only supported in 8.12+"
+      cluster_features: ["gte_v8.15.0"]
+      reason: "data stream failure stores REST structure changed in 8.15+"
 
   - do:
       allowed_warnings:
@@ -570,7 +570,7 @@ setup:
         name: failure-data-stream1
 
   - set: { data_streams.0.indices.0.index_name: idx0name }
-  - set: { data_streams.0.failure_indices.0.index_name: fs0name }
+  - set: { data_streams.0.failure_store.indices.0.index_name: fs0name }
 
   - do:
       indices.get:
@@ -586,8 +586,8 @@ setup:
   - match: { data_streams.0.generation: 1 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - length: { data_streams.0.failure_indices: 1 }
-  - match: { data_streams.0.failure_indices.0.index_name: '/\.fs-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-failure-data-stream1-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
 
   - do:
       indices.delete_data_stream:

+ 12 - 12
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml

@@ -92,8 +92,8 @@
 ---
 "Modify a data stream's failure store":
   - requires:
-      cluster_features: [ "gte_v8.14.0" ]
-      reason: "this API was released in 8.14.0"
+      cluster_features: ["gte_v8.15.0"]
+      reason: "data stream failure stores REST structure changed in 8.15+"
       test_runner_features: [ "allowed_warnings" ]
 
   - do:
@@ -128,14 +128,14 @@
       indices.get_data_stream:
         name: data-stream-for-modification
   - set: { data_streams.0.indices.0.index_name: write_index }
-  - set: { data_streams.0.failure_indices.0.index_name: first_failure_index }
-  - set: { data_streams.0.failure_indices.1.index_name: write_failure_index }
+  - set: { data_streams.0.failure_store.indices.0.index_name: first_failure_index }
+  - set: { data_streams.0.failure_store.indices.1.index_name: write_failure_index }
 
   - do:
       indices.get_data_stream:
         name: data-stream-for-modification2
   - set: { data_streams.0.indices.0.index_name: second_write_index }
-  - set: { data_streams.0.failure_indices.0.index_name: second_write_failure_index }
+  - set: { data_streams.0.failure_store.indices.0.index_name: second_write_failure_index }
 
   - do:
       index:
@@ -170,11 +170,11 @@
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
   - match: { data_streams.0.generation: 3 }
   - length: { data_streams.0.indices: 1 }
-  - length: { data_streams.0.failure_indices: 3 }
+  - length: { data_streams.0.failure_store.indices: 3 }
   - match: { data_streams.0.indices.0.index_name: $write_index }
-  - match: { data_streams.0.failure_indices.0.index_name: 'test_index1' }
-  - match: { data_streams.0.failure_indices.1.index_name: $first_failure_index }
-  - match: { data_streams.0.failure_indices.2.index_name: $write_failure_index }
+  - match: { data_streams.0.failure_store.indices.0.index_name: 'test_index1' }
+  - match: { data_streams.0.failure_store.indices.1.index_name: $first_failure_index }
+  - match: { data_streams.0.failure_store.indices.2.index_name: $write_failure_index }
 
   # An index that has an alias is not allowed to be added to failure store
   - do:
@@ -269,10 +269,10 @@
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
   - match: { data_streams.0.generation: 4 }
   - length: { data_streams.0.indices: 1 }
-  - length: { data_streams.0.failure_indices: 2 }
+  - length: { data_streams.0.failure_store.indices: 2 }
   - match: { data_streams.0.indices.0.index_name: $write_index }
-  - match: { data_streams.0.failure_indices.0.index_name: $first_failure_index }
-  - match: { data_streams.0.failure_indices.1.index_name: $write_failure_index }
+  - match: { data_streams.0.failure_store.indices.0.index_name: $first_failure_index }
+  - match: { data_streams.0.failure_store.indices.1.index_name: $write_failure_index }
 
   - do:
       indices.delete_data_stream:

+ 8 - 8
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

@@ -23,8 +23,8 @@ teardown:
 ---
 "Redirect ingest failure in data stream to failure store":
   - requires:
-      cluster_features: ["gte_v8.13.0"]
-      reason: "data stream failure stores only redirect ingest failures in 8.13+"
+      cluster_features: ["gte_v8.15.0"]
+      reason: "data stream failure stores REST structure changed in 8.15+"
       test_runner_features: [allowed_warnings, contains]
 
   - do:
@@ -74,9 +74,9 @@ teardown:
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - match: { data_streams.0.failure_store: true }
-  - length: { data_streams.0.failure_indices: 1 }
-  - match: { data_streams.0.failure_indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.enabled: true }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
 
   - do:
       search:
@@ -152,9 +152,9 @@ teardown:
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - match: { data_streams.0.failure_store: true }
-  - length: { data_streams.0.failure_indices: 1 }
-  - match: { data_streams.0.failure_indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.enabled: true }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
 
   - do:
       search:

+ 10 - 10
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_rollover_failure_store.yml

@@ -1,8 +1,8 @@
 ---
 setup:
   - requires:
-      cluster_features: ["gte_v8.14.0"]
-      reason: "data stream failure store rollover only supported in 8.14+"
+      cluster_features: ["gte_v8.15.0"]
+      reason: "data stream failure stores REST structure changed in 8.15+"
       test_runner_features: allowed_warnings
 
   - do:
@@ -48,9 +48,9 @@ setup:
   - match: { data_streams.0.generation: 2 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - length: { data_streams.0.failure_indices: 2 }
-  - match: { data_streams.0.failure_indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - match: { data_streams.0.failure_indices.1.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - length: { data_streams.0.failure_store.indices: 2 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
 
 ---
 "Roll over a data stream's failure store with conditions":
@@ -86,9 +86,9 @@ setup:
   - match: { data_streams.0.generation: 2 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - length: { data_streams.0.failure_indices: 2 }
-  - match: { data_streams.0.failure_indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - match: { data_streams.0.failure_indices.1.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - length: { data_streams.0.failure_store.indices: 2 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
 
 ---
 "Don't roll over a data stream's failure store when conditions aren't met":
@@ -112,5 +112,5 @@ setup:
   - match: { data_streams.0.generation: 1 }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - length: { data_streams.0.failure_indices: 1 }
-  - match: { data_streams.0.failure_indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

+ 5 - 5
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml

@@ -50,8 +50,8 @@
 ---
 "Put index template with failure store":
   - requires:
-      cluster_features: ["gte_v8.11.0"]
-      reason: "data stream failure stores only creatable in 8.11+"
+      cluster_features: ["gte_v8.15.0"]
+      reason: "data stream failure stores REST structure changed in 8.15+"
       test_runner_features: allowed_warnings
 
   - do:
@@ -91,9 +91,9 @@
   - match: { data_streams.0.timestamp_field.name: '@timestamp' }
   - length: { data_streams.0.indices: 1 }
   - match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
-  - match: { data_streams.0.failure_store: true }
-  - length: { data_streams.0.failure_indices: 1 }
-  - match: { data_streams.0.failure_indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.enabled: true }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
 
   - do:
       indices.delete_data_stream:

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -164,6 +164,7 @@ public class TransportVersions {
     public static final TransportVersion ML_INFERENCE_AZURE_OPENAI_COMPLETIONS = def(8_655_00_0);
     public static final TransportVersion JOIN_STATUS_AGE_SERIALIZATION = def(8_656_00_0);
     public static final TransportVersion ML_RERANK_DOC_OPTIONAL = def(8_657_00_0);
+    public static final TransportVersion FAILURE_STORE_FIELD_PARITY = def(8_658_00_0);
     /*
      * STOP! READ THIS FIRST! No, really,
      *        ____ _____ ___  ____  _        ____  _____    _    ____    _____ _   _ ___ ____    _____ ___ ____  ____ _____ _

+ 2 - 2
server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java

@@ -266,9 +266,9 @@ public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
 
                     final var dataStream = clusterState.metadata().dataStreams().get(request.index());
                     final var backingIndexName = dataStream.getIndices().get(0).getName();
-                    final var indexNames = dataStream.getFailureIndices().isEmpty()
+                    final var indexNames = dataStream.getFailureIndices().getIndices().isEmpty()
                         ? List.of(backingIndexName)
-                        : List.of(backingIndexName, dataStream.getFailureIndices().get(0).getName());
+                        : List.of(backingIndexName, dataStream.getFailureIndices().getIndices().get(0).getName());
                     taskContext.success(getAckListener(indexNames, allocationActionMultiListener));
                     successfulRequests.put(request, indexNames);
                     return clusterState;

+ 14 - 20
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

@@ -196,12 +196,11 @@ public class MetadataRolloverService {
         final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget);
         return switch (indexAbstraction.getType()) {
             case ALIAS -> resolveAliasRolloverNames(currentState.metadata(), indexAbstraction, newIndexName);
-            case DATA_STREAM -> {
-                if (isFailureStoreRollover) {
-                    yield resolveDataStreamFailureStoreRolloverNames(currentState.metadata(), (DataStream) indexAbstraction);
-                }
-                yield resolveDataStreamRolloverNames(currentState.getMetadata(), (DataStream) indexAbstraction);
-            }
+            case DATA_STREAM -> resolveDataStreamRolloverNames(
+                currentState.metadata(),
+                (DataStream) indexAbstraction,
+                isFailureStoreRollover
+            );
             default ->
                 // the validate method above prevents this case
                 throw new IllegalStateException("unable to roll over type [" + indexAbstraction.getType().getDisplayName() + "]");
@@ -220,19 +219,15 @@ public class MetadataRolloverService {
         return new NameResolution(sourceIndexName, unresolvedName, rolloverIndexName);
     }
 
-    private static NameResolution resolveDataStreamRolloverNames(Metadata metadata, DataStream dataStream) {
-        final IndexMetadata originalWriteIndex = metadata.index(dataStream.getWriteIndex());
-        return new NameResolution(originalWriteIndex.getIndex().getName(), null, dataStream.nextWriteIndexAndGeneration(metadata).v1());
-    }
-
-    private static NameResolution resolveDataStreamFailureStoreRolloverNames(Metadata metadata, DataStream dataStream) {
-        assert dataStream.getFailureStoreWriteIndex() != null : "Unable to roll over failure store with no failure store indices";
+    private static NameResolution resolveDataStreamRolloverNames(Metadata metadata, DataStream dataStream, boolean isFailureStoreRollover) {
+        final DataStream.DataStreamIndices dataStreamIndices = dataStream.getDataStreamIndices(isFailureStoreRollover);
+        assert dataStreamIndices.getWriteIndex() != null : "Unable to roll over dataStreamIndices with no indices";
 
-        final IndexMetadata originalWriteIndex = metadata.index(dataStream.getFailureStoreWriteIndex());
+        final IndexMetadata originalWriteIndex = metadata.index(dataStreamIndices.getWriteIndex());
         return new NameResolution(
             originalWriteIndex.getIndex().getName(),
             null,
-            dataStream.nextFailureStoreWriteIndexAndGeneration(metadata).v1()
+            dataStream.nextWriteIndexAndGeneration(metadata, dataStreamIndices).v1()
         );
     }
 
@@ -327,10 +322,9 @@ public class MetadataRolloverService {
             templateV2 = systemDataStreamDescriptor.getComposableIndexTemplate();
         }
 
-        final Index originalWriteIndex = isFailureStoreRollover ? dataStream.getFailureStoreWriteIndex() : dataStream.getWriteIndex();
-        final Tuple<String, Long> nextIndexAndGeneration = isFailureStoreRollover
-            ? dataStream.nextFailureStoreWriteIndexAndGeneration(currentState.metadata())
-            : dataStream.nextWriteIndexAndGeneration(currentState.metadata());
+        final DataStream.DataStreamIndices dataStreamIndices = dataStream.getDataStreamIndices(isFailureStoreRollover);
+        final Index originalWriteIndex = dataStreamIndices.getWriteIndex();
+        final Tuple<String, Long> nextIndexAndGeneration = dataStream.nextWriteIndexAndGeneration(metadata, dataStreamIndices);
         final String newWriteIndexName = nextIndexAndGeneration.v1();
         final long newGeneration = nextIndexAndGeneration.v2();
         MetadataCreateIndexService.validateIndexName(newWriteIndexName, currentState); // fails if the index already exists
@@ -438,7 +432,7 @@ public class MetadataRolloverService {
         metadataBuilder = withShardSizeForecastForWriteIndex(dataStreamName, metadataBuilder);
 
         newState = ClusterState.builder(newState).metadata(metadataBuilder).build();
-        newState = MetadataDataStreamsService.setRolloverOnWrite(newState, dataStreamName, false);
+        newState = MetadataDataStreamsService.setRolloverOnWrite(newState, dataStreamName, false, isFailureStoreRollover);
 
         return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), newState);
     }

+ 3 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

@@ -169,12 +169,13 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
         assert task instanceof CancellableTask;
         Metadata metadata = clusterState.metadata();
         // We evaluate the names of the index for which we should evaluate conditions, as well as what our newly created index *would* be.
+        boolean targetFailureStore = rolloverRequest.indicesOptions().failureStoreOptions().includeFailureIndices();
         final MetadataRolloverService.NameResolution trialRolloverNames = MetadataRolloverService.resolveRolloverNames(
             clusterState,
             rolloverRequest.getRolloverTarget(),
             rolloverRequest.getNewIndexName(),
             rolloverRequest.getCreateIndexRequest(),
-            rolloverRequest.indicesOptions().failureStoreOptions().includeFailureIndices()
+            targetFailureStore
         );
         final String trialSourceIndexName = trialRolloverNames.sourceName();
         final String trialRolloverIndexName = trialRolloverNames.rolloverName();
@@ -200,6 +201,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
                 metadataDataStreamsService.setRolloverOnWrite(
                     rolloverRequest.getRolloverTarget(),
                     true,
+                    targetFailureStore,
                     rolloverRequest.ackTimeout(),
                     rolloverRequest.masterNodeTimeout(),
                     listener.map(

+ 50 - 53
server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java

@@ -189,6 +189,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
             public static final ParseField TIME_SINCE_LAST_AUTO_SHARD_EVENT_MILLIS = new ParseField(
                 "time_since_last_auto_shard_event_millis"
             );
+            public static final ParseField FAILURE_STORE_ENABLED = new ParseField("enabled");
 
             private final DataStream dataStream;
             private final ClusterHealthStatus dataStreamStatus;
@@ -222,7 +223,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
             @SuppressWarnings("unchecked")
             DataStreamInfo(StreamInput in) throws IOException {
                 this(
-                    new DataStream(in),
+                    DataStream.read(in),
                     ClusterHealthStatus.readFrom(in),
                     in.readOptionalString(),
                     in.readOptionalString(),
@@ -300,45 +301,8 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                     .field(DataStream.NAME_FIELD.getPreferredName(), DataStream.TIMESTAMP_FIELD_NAME)
                     .endObject();
 
-                builder.field(DataStream.INDICES_FIELD.getPreferredName());
-                if (dataStream.getIndices() == null) {
-                    builder.nullValue();
-                } else {
-                    builder.startArray();
-                    for (Index index : dataStream.getIndices()) {
-                        builder.startObject();
-                        index.toXContentFragment(builder);
-                        IndexProperties indexProperties = indexSettingsValues.get(index);
-                        if (indexProperties != null) {
-                            builder.field(PREFER_ILM.getPreferredName(), indexProperties.preferIlm());
-                            if (indexProperties.ilmPolicyName() != null) {
-                                builder.field(ILM_POLICY_FIELD.getPreferredName(), indexProperties.ilmPolicyName());
-                            }
-                            builder.field(MANAGED_BY.getPreferredName(), indexProperties.managedBy.displayValue);
-                        }
-                        builder.endObject();
-                    }
-                    builder.endArray();
-                }
+                indicesToXContent(builder, dataStream.getIndices());
                 builder.field(DataStream.GENERATION_FIELD.getPreferredName(), dataStream.getGeneration());
-                if (DataStream.isFailureStoreFeatureFlagEnabled()) {
-                    builder.field(DataStream.FAILURE_INDICES_FIELD.getPreferredName());
-                    builder.startArray();
-                    for (Index failureStore : dataStream.getFailureIndices()) {
-                        builder.startObject();
-                        failureStore.toXContentFragment(builder);
-                        IndexProperties indexProperties = indexSettingsValues.get(failureStore);
-                        if (indexProperties != null) {
-                            builder.field(PREFER_ILM.getPreferredName(), indexProperties.preferIlm());
-                            if (indexProperties.ilmPolicyName() != null) {
-                                builder.field(ILM_POLICY_FIELD.getPreferredName(), indexProperties.ilmPolicyName());
-                            }
-                            builder.field(MANAGED_BY.getPreferredName(), indexProperties.managedBy.displayValue);
-                        }
-                        builder.endObject();
-                    }
-                    builder.endArray();
-                }
                 if (dataStream.getMetadata() != null) {
                     builder.field(DataStream.METADATA_FIELD.getPreferredName(), dataStream.getMetadata());
                 }
@@ -361,20 +325,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                 builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), dataStream.isAllowCustomRouting());
                 builder.field(REPLICATED.getPreferredName(), dataStream.isReplicated());
                 builder.field(ROLLOVER_ON_WRITE.getPreferredName(), dataStream.rolloverOnWrite());
-                if (DataStream.isFailureStoreFeatureFlagEnabled()) {
-                    builder.field(DataStream.FAILURE_STORE_FIELD.getPreferredName(), dataStream.isFailureStoreEnabled());
-                }
-                if (dataStream.getAutoShardingEvent() != null) {
-                    DataStreamAutoShardingEvent autoShardingEvent = dataStream.getAutoShardingEvent();
-                    builder.startObject(AUTO_SHARDING_FIELD.getPreferredName());
-                    autoShardingEvent.toXContent(builder, params);
-                    builder.humanReadableField(
-                        TIME_SINCE_LAST_AUTO_SHARD_EVENT_MILLIS.getPreferredName(),
-                        TIME_SINCE_LAST_AUTO_SHARD_EVENT.getPreferredName(),
-                        autoShardingEvent.getTimeSinceLastAutoShardingEvent(System::currentTimeMillis)
-                    );
-                    builder.endObject();
-                }
+                addAutoShardingEvent(builder, params, dataStream.getAutoShardingEvent());
                 if (timeSeries != null) {
                     builder.startObject(TIME_SERIES.getPreferredName());
                     builder.startArray(TEMPORAL_RANGES.getPreferredName());
@@ -389,10 +340,56 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                     builder.endArray();
                     builder.endObject();
                 }
+                if (DataStream.isFailureStoreFeatureFlagEnabled()) {
+                    builder.startObject(DataStream.FAILURE_STORE_FIELD.getPreferredName());
+                    builder.field(FAILURE_STORE_ENABLED.getPreferredName(), dataStream.isFailureStoreEnabled());
+                    builder.field(
+                        DataStream.ROLLOVER_ON_WRITE_FIELD.getPreferredName(),
+                        dataStream.getFailureIndices().isRolloverOnWrite()
+                    );
+                    indicesToXContent(builder, dataStream.getFailureIndices().getIndices());
+                    addAutoShardingEvent(builder, params, dataStream.getFailureIndices().getAutoShardingEvent());
+                    builder.endObject();
+                }
                 builder.endObject();
                 return builder;
             }
 
+            private XContentBuilder indicesToXContent(XContentBuilder builder, List<Index> indices) throws IOException {
+                builder.field(DataStream.INDICES_FIELD.getPreferredName());
+                builder.startArray();
+                for (Index index : indices) {
+                    builder.startObject();
+                    index.toXContentFragment(builder);
+                    IndexProperties indexProperties = indexSettingsValues.get(index);
+                    if (indexProperties != null) {
+                        builder.field(PREFER_ILM.getPreferredName(), indexProperties.preferIlm());
+                        if (indexProperties.ilmPolicyName() != null) {
+                            builder.field(ILM_POLICY_FIELD.getPreferredName(), indexProperties.ilmPolicyName());
+                        }
+                        builder.field(MANAGED_BY.getPreferredName(), indexProperties.managedBy.displayValue);
+                    }
+                    builder.endObject();
+                }
+                builder.endArray();
+                return builder;
+            }
+
+            private void addAutoShardingEvent(XContentBuilder builder, Params params, DataStreamAutoShardingEvent autoShardingEvent)
+                throws IOException {
+                if (autoShardingEvent == null) {
+                    return;
+                }
+                builder.startObject(AUTO_SHARDING_FIELD.getPreferredName());
+                autoShardingEvent.toXContent(builder, params);
+                builder.humanReadableField(
+                    TIME_SINCE_LAST_AUTO_SHARD_EVENT_MILLIS.getPreferredName(),
+                    TIME_SINCE_LAST_AUTO_SHARD_EVENT.getPreferredName(),
+                    autoShardingEvent.getTimeSinceLastAutoShardingEvent(System::currentTimeMillis)
+                );
+                builder.endObject();
+            }
+
             /**
              * Computes and returns which system will manage the next generation for this data stream.
              */

+ 2 - 2
server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

@@ -862,12 +862,12 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
             // Resolve write index and get parent data stream to handle the case of dealing with an alias
             String defaultWriteIndexName = ia.getWriteIndex().getName();
             DataStream dataStream = metadata.getIndicesLookup().get(defaultWriteIndexName).getParentDataStream();
-            if (dataStream.getFailureIndices().size() < 1) {
+            if (dataStream.getFailureIndices().getIndices().size() < 1) {
                 throw new ElasticsearchException(
                     "Attempting to write a document to a failure store but the target data stream does not have one enabled"
                 );
             }
-            return dataStream.getFailureIndices().get(dataStream.getFailureIndices().size() - 1);
+            return dataStream.getFailureIndices().getIndices().get(dataStream.getFailureIndices().getIndices().size() - 1);
         } else {
             // Resolve as normal
             return ia.getWriteIndex(this, metadata);

+ 343 - 208
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -22,7 +22,6 @@ import org.elasticsearch.cluster.SimpleDiffable;
 import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling.Round;
 import org.elasticsearch.common.ParsingException;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.TriFunction;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -52,7 +51,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -102,7 +100,6 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
 
     private final LongSupplier timeProvider;
     private final String name;
-    private final List<Index> indices;
     private final long generation;
     @Nullable
     private final Map<String, Object> metadata;
@@ -114,12 +111,10 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     private final IndexMode indexMode;
     @Nullable
     private final DataStreamLifecycle lifecycle;
-    private final boolean rolloverOnWrite;
     private final boolean failureStoreEnabled;
-    private final List<Index> failureIndices;
-    private volatile Set<String> failureStoreLookup;
-    @Nullable
-    private final DataStreamAutoShardingEvent autoShardingEvent;
+
+    private final DataStreamIndices backingIndices;
+    private final DataStreamIndices failureIndices;
 
     public DataStream(
         String name,
@@ -139,7 +134,6 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     ) {
         this(
             name,
-            indices,
             generation,
             metadata,
             hidden,
@@ -150,16 +144,14 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             indexMode,
             lifecycle,
             failureStoreEnabled,
-            failureIndices,
-            rolloverOnWrite,
-            autoShardingEvent
+            new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent),
+            new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null)
         );
     }
 
     // visible for testing
     DataStream(
         String name,
-        List<Index> indices,
         long generation,
         Map<String, Object> metadata,
         boolean hidden,
@@ -170,13 +162,10 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         IndexMode indexMode,
         DataStreamLifecycle lifecycle,
         boolean failureStoreEnabled,
-        List<Index> failureIndices,
-        boolean rolloverOnWrite,
-        @Nullable DataStreamAutoShardingEvent autoShardingEvent
+        DataStreamIndices backingIndices,
+        DataStreamIndices failureIndices
     ) {
         this.name = name;
-        this.indices = List.copyOf(indices);
-        assert indices.isEmpty() == false;
         this.generation = generation;
         this.metadata = metadata;
         assert system == false || hidden; // system indices must be hidden
@@ -188,21 +177,11 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         this.indexMode = indexMode;
         this.lifecycle = lifecycle;
         this.failureStoreEnabled = failureStoreEnabled;
+        assert backingIndices.indices.isEmpty() == false;
+        assert replicated == false || (backingIndices.rolloverOnWrite == false && failureIndices.rolloverOnWrite == false)
+            : "replicated data streams cannot be marked for lazy rollover";
+        this.backingIndices = backingIndices;
         this.failureIndices = failureIndices;
-        assert assertConsistent(this.indices);
-        assert replicated == false || rolloverOnWrite == false : "replicated data streams cannot be marked for lazy rollover";
-        this.rolloverOnWrite = rolloverOnWrite;
-        this.autoShardingEvent = autoShardingEvent;
-    }
-
-    private static boolean assertConsistent(List<Index> indices) {
-        assert indices.size() > 0;
-        final Set<String> indexNames = new HashSet<>();
-        for (Index index : indices) {
-            final boolean added = indexNames.add(index.getName());
-            assert added : "found duplicate index entries in " + indices;
-        }
-        return true;
     }
 
     @Override
@@ -222,20 +201,16 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
 
     @Override
     public List<Index> getIndices() {
-        return indices;
+        return backingIndices.indices;
     }
 
     public long getGeneration() {
         return generation;
     }
 
-    public List<Index> getFailureIndices() {
-        return failureIndices;
-    }
-
     @Override
     public Index getWriteIndex() {
-        return indices.get(indices.size() - 1);
+        return backingIndices.getWriteIndex();
     }
 
     /**
@@ -243,29 +218,18 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      */
     @Nullable
     public Index getFailureStoreWriteIndex() {
-        return isFailureStoreEnabled() == false || failureIndices.isEmpty() ? null : failureIndices.get(failureIndices.size() - 1);
+        return isFailureStoreEnabled() == false || failureIndices.indices.isEmpty() ? null : failureIndices.getWriteIndex();
     }
 
     /**
      * Returns true if the index name provided belongs to a failure store index.
-     * This method builds a local Set with all the failure store index names and then checks if it contains the name.
-     * This will perform better if there are multiple indices of this data stream checked.
      */
     public boolean isFailureStoreIndex(String indexName) {
-        if (failureStoreLookup == null) {
-            // There is a chance this will be calculated twice, but it's a relatively cheap action,
-            // so it's not worth synchronising
-            if (failureIndices == null || failureIndices.isEmpty()) {
-                failureStoreLookup = Set.of();
-            } else {
-                failureStoreLookup = failureIndices.stream().map(Index::getName).collect(Collectors.toSet());
-            }
-        }
-        return failureStoreLookup.contains(indexName);
+        return failureIndices.containsIndex(indexName);
     }
 
     public boolean rolloverOnWrite() {
-        return rolloverOnWrite;
+        return backingIndices.rolloverOnWrite;
     }
 
     /**
@@ -275,8 +239,8 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      *         an end time that is less than the provided timestamp. Otherwise <code>null</code> is returned.
      */
     public Index selectTimeSeriesWriteIndex(Instant timestamp, Metadata metadata) {
-        for (int i = indices.size() - 1; i >= 0; i--) {
-            Index index = indices.get(i);
+        for (int i = backingIndices.indices.size() - 1; i >= 0; i--) {
+            Index index = backingIndices.indices.get(i);
             IndexMetadata im = metadata.index(index);
 
             // TODO: make index_mode, start and end time fields in IndexMetadata class.
@@ -306,7 +270,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     public void validate(Function<String, IndexMetadata> imSupplier) {
         if (indexMode == IndexMode.TIME_SERIES) {
             // Get a sorted overview of each backing index with there start and end time range:
-            var startAndEndTimes = indices.stream().map(index -> {
+            var startAndEndTimes = backingIndices.indices.stream().map(index -> {
                 IndexMetadata im = imSupplier.apply(index.getName());
                 if (im == null) {
                     throw new IllegalStateException("index [" + index.getName() + "] is not found in the index metadata supplier");
@@ -407,7 +371,19 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      * Returns the latest auto sharding event that happened for this data stream
      */
     public DataStreamAutoShardingEvent getAutoShardingEvent() {
-        return autoShardingEvent;
+        return backingIndices.autoShardingEvent;
+    }
+
+    public DataStreamIndices getBackingIndices() {
+        return backingIndices;
+    }
+
+    public DataStreamIndices getFailureIndices() {
+        return failureIndices;
+    }
+
+    public DataStreamIndices getDataStreamIndices(boolean failureStore) {
+        return failureStore ? this.failureIndices : backingIndices;
     }
 
     /**
@@ -446,15 +422,11 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             indexMode = null;
         }
 
-        List<Index> backingIndices = new ArrayList<>(indices);
+        List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices);
         backingIndices.add(writeIndex);
-        return copy().setIndices(backingIndices)
-            .setGeneration(generation)
-            .setReplicated(false)
-            .setIndexMode(indexMode)
-            .setAutoShardingEvent(autoShardingEvent)
-            .setRolloverOnWrite(false)
-            .build();
+        return copy().setBackingIndices(
+            this.backingIndices.copy().setIndices(backingIndices).setAutoShardingEvent(autoShardingEvent).setRolloverOnWrite(false).build()
+        ).setGeneration(generation).setIndexMode(indexMode).build();
     }
 
     /**
@@ -475,56 +447,32 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      * Like {@link #rolloverFailureStore(Index, long)}, but does no validation, use with care only.
      */
     public DataStream unsafeRolloverFailureStore(Index writeIndex, long generation) {
-        List<Index> failureIndices = new ArrayList<>(this.failureIndices);
+        List<Index> failureIndices = new ArrayList<>(this.failureIndices.indices);
         failureIndices.add(writeIndex);
-        return copy().setGeneration(generation).setReplicated(false).setFailureIndices(failureIndices).build();
+        return copy().setGeneration(generation).setFailureIndices(this.failureIndices.copy().setIndices(failureIndices).build()).build();
     }
 
     /**
      * Generates the next write index name and <code>generation</code> to be used for rolling over this data stream.
      *
      * @param clusterMetadata Cluster metadata
+     * @param dataStreamIndices The data stream indices that we're generating the next write index name and generation for
      * @return tuple of the next write index name and next generation.
      */
-    public Tuple<String, Long> nextWriteIndexAndGeneration(Metadata clusterMetadata) {
-        ensureNotReplicated();
-        return unsafeNextWriteIndexAndGeneration(clusterMetadata);
-    }
-
-    /**
-     * Like {@link #nextWriteIndexAndGeneration(Metadata)}, but does no validation, use with care only.
-     */
-    public Tuple<String, Long> unsafeNextWriteIndexAndGeneration(Metadata clusterMetadata) {
-        return generateNextWriteIndexAndGeneration(clusterMetadata, DataStream::getDefaultBackingIndexName);
-    }
-
-    /**
-     * Generates the next write index name and <code>generation</code> to be used for rolling over the failure store of this data stream.
-     *
-     * @param clusterMetadata Cluster metadata
-     * @return tuple of the next failure store write index name and next generation.
-     */
-    public Tuple<String, Long> nextFailureStoreWriteIndexAndGeneration(Metadata clusterMetadata) {
+    public Tuple<String, Long> nextWriteIndexAndGeneration(Metadata clusterMetadata, DataStreamIndices dataStreamIndices) {
         ensureNotReplicated();
-        return unsafeNextFailureStoreWriteIndexAndGeneration(clusterMetadata);
+        return unsafeNextWriteIndexAndGeneration(clusterMetadata, dataStreamIndices);
     }
 
     /**
-     * Like {@link #nextFailureStoreWriteIndexAndGeneration(Metadata)}, but does no validation, use with care only.
+     * Like {@link #nextWriteIndexAndGeneration(Metadata, DataStreamIndices)}, but does no validation, use with care only.
      */
-    public Tuple<String, Long> unsafeNextFailureStoreWriteIndexAndGeneration(Metadata clusterMetadata) {
-        return generateNextWriteIndexAndGeneration(clusterMetadata, DataStream::getDefaultFailureStoreName);
-    }
-
-    private Tuple<String, Long> generateNextWriteIndexAndGeneration(
-        Metadata clusterMetadata,
-        TriFunction<String, Long, Long, String> nameGenerator
-    ) {
+    public Tuple<String, Long> unsafeNextWriteIndexAndGeneration(Metadata clusterMetadata, DataStreamIndices dataStreamIndices) {
         String newWriteIndexName;
         long generation = this.generation;
         long currentTimeMillis = timeProvider.getAsLong();
         do {
-            newWriteIndexName = nameGenerator.apply(getName(), ++generation, currentTimeMillis);
+            newWriteIndexName = dataStreamIndices.generateName(name, ++generation, currentTimeMillis);
         } while (clusterMetadata.hasIndexAbstraction(newWriteIndexName));
         return Tuple.tuple(newWriteIndexName, generation);
     }
@@ -544,14 +492,14 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      * @throws IllegalArgumentException if {@code index} is not a backing index or is the current write index of the data stream
      */
     public DataStream removeBackingIndex(Index index) {
-        int backingIndexPosition = indices.indexOf(index);
+        int backingIndexPosition = backingIndices.indices.indexOf(index);
 
         if (backingIndexPosition == -1) {
             throw new IllegalArgumentException(
                 String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]", index.getName(), name)
             );
         }
-        if (indices.size() == (backingIndexPosition + 1)) {
+        if (backingIndices.indices.size() == (backingIndexPosition + 1)) {
             throw new IllegalArgumentException(
                 String.format(
                     Locale.ROOT,
@@ -562,10 +510,12 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             );
         }
 
-        List<Index> backingIndices = new ArrayList<>(indices);
+        List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices);
         backingIndices.remove(index);
-        assert backingIndices.size() == indices.size() - 1;
-        return copy().setIndices(backingIndices).setGeneration(generation + 1).build();
+        assert backingIndices.size() == this.backingIndices.indices.size() - 1;
+        return copy().setBackingIndices(this.backingIndices.copy().setIndices(backingIndices).build())
+            .setGeneration(generation + 1)
+            .build();
     }
 
     /**
@@ -578,7 +528,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      * data stream
      */
     public DataStream removeFailureStoreIndex(Index index) {
-        int failureIndexPosition = failureIndices.indexOf(index);
+        int failureIndexPosition = failureIndices.indices.indexOf(index);
 
         if (failureIndexPosition == -1) {
             throw new IllegalArgumentException(
@@ -588,7 +538,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
 
         // TODO: When failure stores are lazily created, this wont necessarily be required anymore. We can remove the failure store write
         // index as long as we mark the data stream to lazily rollover the failure store with no conditions on its next write
-        if (failureIndices.size() == (failureIndexPosition + 1)) {
+        if (failureIndices.indices.size() == (failureIndexPosition + 1)) {
             throw new IllegalArgumentException(
                 String.format(
                     Locale.ROOT,
@@ -599,10 +549,12 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             );
         }
 
-        List<Index> updatedFailureIndices = new ArrayList<>(failureIndices);
+        List<Index> updatedFailureIndices = new ArrayList<>(failureIndices.indices);
         updatedFailureIndices.remove(index);
-        assert updatedFailureIndices.size() == failureIndices.size() - 1;
-        return copy().setGeneration(generation + 1).setFailureIndices(updatedFailureIndices).build();
+        assert updatedFailureIndices.size() == failureIndices.indices.size() - 1;
+        return copy().setFailureIndices(failureIndices.copy().setIndices(updatedFailureIndices).build())
+            .setGeneration(generation + 1)
+            .build();
     }
 
     /**
@@ -616,14 +568,14 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      * existing index.
      */
     public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBackingIndex) {
-        List<Index> backingIndices = new ArrayList<>(indices);
+        List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices);
         int backingIndexPosition = backingIndices.indexOf(existingBackingIndex);
         if (backingIndexPosition == -1) {
             throw new IllegalArgumentException(
                 String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]", existingBackingIndex.getName(), name)
             );
         }
-        if (indices.size() == (backingIndexPosition + 1)) {
+        if (this.backingIndices.indices.size() == (backingIndexPosition + 1)) {
             throw new IllegalArgumentException(
                 String.format(
                     Locale.ROOT,
@@ -634,7 +586,9 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             );
         }
         backingIndices.set(backingIndexPosition, newBackingIndex);
-        return copy().setIndices(backingIndices).setGeneration(generation + 1).build();
+        return copy().setBackingIndices(this.backingIndices.copy().setIndices(backingIndices).build())
+            .setGeneration(generation + 1)
+            .build();
     }
 
     /**
@@ -656,10 +610,12 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         // ensure that no aliases reference index
         ensureNoAliasesOnIndex(clusterMetadata, index);
 
-        List<Index> backingIndices = new ArrayList<>(indices);
+        List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices);
         backingIndices.add(0, index);
-        assert backingIndices.size() == indices.size() + 1;
-        return copy().setIndices(backingIndices).setGeneration(generation + 1).build();
+        assert backingIndices.size() == this.backingIndices.indices.size() + 1;
+        return copy().setBackingIndices(this.backingIndices.copy().setIndices(backingIndices).build())
+            .setGeneration(generation + 1)
+            .build();
     }
 
     /**
@@ -680,10 +636,12 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
 
         ensureNoAliasesOnIndex(clusterMetadata, index);
 
-        List<Index> updatedFailureIndices = new ArrayList<>(failureIndices);
+        List<Index> updatedFailureIndices = new ArrayList<>(failureIndices.indices);
         updatedFailureIndices.add(0, index);
-        assert updatedFailureIndices.size() == failureIndices.size() + 1;
-        return copy().setGeneration(generation + 1).setFailureIndices(updatedFailureIndices).build();
+        assert updatedFailureIndices.size() == failureIndices.indices.size() + 1;
+        return copy().setFailureIndices(failureIndices.copy().setIndices(updatedFailureIndices).build())
+            .setGeneration(generation + 1)
+            .build();
     }
 
     /**
@@ -742,7 +700,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     @Nullable
     public DataStream snapshot(Collection<String> indicesInSnapshot) {
         // do not include indices not available in the snapshot
-        List<Index> reconciledIndices = new ArrayList<>(this.indices);
+        List<Index> reconciledIndices = new ArrayList<>(this.backingIndices.indices);
         if (reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false) == false) {
             return this;
         }
@@ -751,7 +709,9 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             return null;
         }
 
-        return copy().setIndices(reconciledIndices).setMetadata(metadata == null ? null : new HashMap<>(metadata)).build();
+        return copy().setBackingIndices(backingIndices.copy().setIndices(reconciledIndices).build())
+            .setMetadata(metadata == null ? null : new HashMap<>(metadata))
+            .build();
     }
 
     /**
@@ -792,7 +752,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         Function<String, IndexMetadata> indexMetadataSupplier,
         LongSupplier nowSupplier
     ) {
-        assert indices.contains(index) : "the provided index must be a backing index for this datastream";
+        assert backingIndices.indices.contains(index) : "the provided index must be a backing index for this datastream";
         if (lifecycle == null || lifecycle.getDownsamplingRounds() == null) {
             return List.of();
         }
@@ -831,7 +791,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         LongSupplier nowSupplier
     ) {
         List<Index> olderIndices = new ArrayList<>();
-        for (Index index : indices) {
+        for (Index index : backingIndices.indices) {
             if (isIndexOderThan(index, retentionPeriod.getMillis(), nowSupplier.getAsLong(), indicesPredicate, indexMetadataSupplier)) {
                 olderIndices.add(index);
             }
@@ -864,7 +824,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      * we return false.
      */
     public boolean isIndexManagedByDataStreamLifecycle(Index index, Function<String, IndexMetadata> indexMetadataSupplier) {
-        if (indices.contains(index) == false) {
+        if (backingIndices.indices.contains(index) == false) {
             return false;
         }
         IndexMetadata indexMetadata = indexMetadataSupplier.apply(index.getName());
@@ -936,13 +896,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      * @return backing index name
      */
     public static String getDefaultBackingIndexName(String dataStreamName, long generation, long epochMillis) {
-        return String.format(
-            Locale.ROOT,
-            BACKING_INDEX_PREFIX + "%s-%s-%06d",
-            dataStreamName,
-            DATE_FORMATTER.formatMillis(epochMillis),
-            generation
-        );
+        return getDefaultIndexName(BACKING_INDEX_PREFIX, dataStreamName, generation, epochMillis);
     }
 
     /**
@@ -955,33 +909,65 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      * @return backing index name
      */
     public static String getDefaultFailureStoreName(String dataStreamName, long generation, long epochMillis) {
-        return String.format(
-            Locale.ROOT,
-            FAILURE_STORE_PREFIX + "%s-%s-%06d",
-            dataStreamName,
-            DATE_FORMATTER.formatMillis(epochMillis),
-            generation
-        );
+        return getDefaultIndexName(FAILURE_STORE_PREFIX, dataStreamName, generation, epochMillis);
     }
 
-    public DataStream(StreamInput in) throws IOException {
-        this(
-            readName(in),
-            readIndices(in),
-            in.readVLong(),
-            in.readGenericMap(),
-            in.readBoolean(),
-            in.readBoolean(),
-            in.readBoolean(),
-            in.getTransportVersion().onOrAfter(TransportVersions.V_8_0_0) ? in.readBoolean() : false,
-            in.getTransportVersion().onOrAfter(TransportVersions.V_8_1_0) ? in.readOptionalEnum(IndexMode.class) : null,
-            in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X) ? in.readOptionalWriteable(DataStreamLifecycle::new) : null,
-            in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION) ? in.readBoolean() : false,
-            in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION) ? readIndices(in) : List.of(),
-            in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? in.readBoolean() : false,
-            in.getTransportVersion().onOrAfter(DataStream.ADDED_AUTO_SHARDING_EVENT_VERSION)
-                ? in.readOptionalWriteable(DataStreamAutoShardingEvent::new)
-                : null
+    /**
+     * Generates the name of the index that conforms to the default naming convention for indices
+     * on data streams given the specified prefix, data stream name, generation, and time.
+     *
+     * @param prefix the prefix that the index name should have
+     * @param dataStreamName name of the data stream
+     * @param generation generation of the data stream
+     * @param epochMillis creation time for the backing index
+     * @return backing index name
+     */
+    private static String getDefaultIndexName(String prefix, String dataStreamName, long generation, long epochMillis) {
+        return String.format(Locale.ROOT, prefix + "%s-%s-%06d", dataStreamName, DATE_FORMATTER.formatMillis(epochMillis), generation);
+    }
+
+    public static DataStream read(StreamInput in) throws IOException {
+        var name = readName(in);
+        var backingIndicesBuilder = DataStreamIndices.backingIndicesBuilder(readIndices(in));
+        var generation = in.readVLong();
+        var metadata = in.readGenericMap();
+        var hidden = in.readBoolean();
+        var replicated = in.readBoolean();
+        var system = in.readBoolean();
+        var allowCustomRouting = in.getTransportVersion().onOrAfter(TransportVersions.V_8_0_0) ? in.readBoolean() : false;
+        var indexMode = in.getTransportVersion().onOrAfter(TransportVersions.V_8_1_0) ? in.readOptionalEnum(IndexMode.class) : null;
+        var lifecycle = in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)
+            ? in.readOptionalWriteable(DataStreamLifecycle::new)
+            : null;
+        var failureStoreEnabled = in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)
+            ? in.readBoolean()
+            : false;
+        var failureIndices = in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)
+            ? readIndices(in)
+            : List.<Index>of();
+        var failureIndicesBuilder = DataStreamIndices.failureIndicesBuilder(failureIndices);
+        backingIndicesBuilder.setRolloverOnWrite(in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? in.readBoolean() : false);
+        if (in.getTransportVersion().onOrAfter(DataStream.ADDED_AUTO_SHARDING_EVENT_VERSION)) {
+            backingIndicesBuilder.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
+        }
+        if (in.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_FIELD_PARITY)) {
+            failureIndicesBuilder.setRolloverOnWrite(in.readBoolean())
+                .setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
+        }
+        return new DataStream(
+            name,
+            generation,
+            metadata,
+            hidden,
+            replicated,
+            system,
+            System::currentTimeMillis,
+            allowCustomRouting,
+            indexMode,
+            lifecycle,
+            failureStoreEnabled,
+            backingIndicesBuilder.build(),
+            failureIndicesBuilder.build()
         );
     }
 
@@ -996,14 +982,14 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     }
 
     public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
-        return SimpleDiffable.readDiffFrom(DataStream::new, in);
+        return SimpleDiffable.readDiffFrom(DataStream::read, in);
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(name);
         out.writeString(TIMESTAMP_FIELD_NAME); // TODO: clear this out in the future https://github.com/elastic/elasticsearch/issues/101991
-        out.writeCollection(indices);
+        out.writeCollection(backingIndices.indices);
         out.writeVLong(generation);
         out.writeGenericMap(metadata);
         out.writeBoolean(hidden);
@@ -1020,13 +1006,17 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         }
         if (out.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)) {
             out.writeBoolean(failureStoreEnabled);
-            out.writeCollection(failureIndices);
+            out.writeCollection(failureIndices.indices);
         }
         if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
-            out.writeBoolean(rolloverOnWrite);
+            out.writeBoolean(backingIndices.rolloverOnWrite);
         }
         if (out.getTransportVersion().onOrAfter(DataStream.ADDED_AUTO_SHARDING_EVENT_VERSION)) {
-            out.writeOptionalWriteable(autoShardingEvent);
+            out.writeOptionalWriteable(backingIndices.autoShardingEvent);
+        }
+        if (out.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_FIELD_PARITY)) {
+            out.writeBoolean(failureIndices.rolloverOnWrite);
+            out.writeOptionalWriteable(failureIndices.autoShardingEvent);
         }
     }
 
@@ -1045,30 +1035,41 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     public static final ParseField FAILURE_INDICES_FIELD = new ParseField("failure_indices");
     public static final ParseField ROLLOVER_ON_WRITE_FIELD = new ParseField("rollover_on_write");
     public static final ParseField AUTO_SHARDING_FIELD = new ParseField("auto_sharding");
+    public static final ParseField FAILURE_ROLLOVER_ON_WRITE_FIELD = new ParseField("failure_rollover_on_write");
+    public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding");
 
     @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream", args -> {
         // Fields behind a feature flag need to be parsed last otherwise the parser will fail when the feature flag is disabled.
         // Until the feature flag is removed we keep them separately to be mindful of this.
         boolean failureStoreEnabled = DataStream.isFailureStoreFeatureFlagEnabled() && args[12] != null && (boolean) args[12];
-        List<Index> failureStoreIndices = DataStream.isFailureStoreFeatureFlagEnabled() && args[13] != null
-            ? (List<Index>) args[13]
-            : List.of();
+        DataStreamIndices failureIndices = DataStream.isFailureStoreFeatureFlagEnabled()
+            ? new DataStreamIndices(
+                FAILURE_STORE_PREFIX,
+                args[13] != null ? (List<Index>) args[13] : List.of(),
+                args[14] != null && (boolean) args[14],
+                (DataStreamAutoShardingEvent) args[15]
+            )
+            : new DataStreamIndices(FAILURE_STORE_PREFIX, List.of(), false, null);
         return new DataStream(
             (String) args[0],
-            (List<Index>) args[1],
             (Long) args[2],
             (Map<String, Object>) args[3],
             args[4] != null && (boolean) args[4],
             args[5] != null && (boolean) args[5],
             args[6] != null && (boolean) args[6],
+            System::currentTimeMillis,
             args[7] != null && (boolean) args[7],
             args[8] != null ? IndexMode.fromString((String) args[8]) : null,
             (DataStreamLifecycle) args[9],
             failureStoreEnabled,
-            failureStoreIndices,
-            args[10] != null && (boolean) args[10],
-            (DataStreamAutoShardingEvent) args[11]
+            new DataStreamIndices(
+                BACKING_INDEX_PREFIX,
+                (List<Index>) args[1],
+                args[10] != null && (boolean) args[10],
+                (DataStreamAutoShardingEvent) args[11]
+            ),
+            failureIndices
         );
     });
 
@@ -1105,6 +1106,12 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
                 (p, c) -> Index.fromXContent(p),
                 FAILURE_INDICES_FIELD
             );
+            PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FAILURE_ROLLOVER_ON_WRITE_FIELD);
+            PARSER.declareObject(
+                ConstructingObjectParser.optionalConstructorArg(),
+                (p, c) -> DataStreamAutoShardingEvent.fromXContent(p),
+                FAILURE_AUTO_SHARDING_FIELD
+            );
         }
     }
 
@@ -1132,11 +1139,8 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             .startObject()
             .field(NAME_FIELD.getPreferredName(), TIMESTAMP_FIELD_NAME)
             .endObject();
-        builder.xContentList(INDICES_FIELD.getPreferredName(), indices);
+        builder.xContentList(INDICES_FIELD.getPreferredName(), backingIndices.indices);
         builder.field(GENERATION_FIELD.getPreferredName(), generation);
-        if (DataStream.isFailureStoreFeatureFlagEnabled() && failureIndices.isEmpty() == false) {
-            builder.xContentList(FAILURE_INDICES_FIELD.getPreferredName(), failureIndices);
-        }
         if (metadata != null) {
             builder.field(METADATA_FIELD.getPreferredName(), metadata);
         }
@@ -1146,6 +1150,15 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting);
         if (DataStream.isFailureStoreFeatureFlagEnabled()) {
             builder.field(FAILURE_STORE_FIELD.getPreferredName(), failureStoreEnabled);
+            if (failureIndices.indices.isEmpty() == false) {
+                builder.xContentList(FAILURE_INDICES_FIELD.getPreferredName(), failureIndices.indices);
+            }
+            builder.field(FAILURE_ROLLOVER_ON_WRITE_FIELD.getPreferredName(), failureIndices.rolloverOnWrite);
+            if (failureIndices.autoShardingEvent != null) {
+                builder.startObject(FAILURE_AUTO_SHARDING_FIELD.getPreferredName());
+                failureIndices.autoShardingEvent.toXContent(builder, params);
+                builder.endObject();
+            }
         }
         if (indexMode != null) {
             builder.field(INDEX_MODE.getPreferredName(), indexMode);
@@ -1154,10 +1167,10 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             builder.field(LIFECYCLE.getPreferredName());
             lifecycle.toXContent(builder, params, rolloverConfiguration, isSystem() ? null : globalRetention);
         }
-        builder.field(ROLLOVER_ON_WRITE_FIELD.getPreferredName(), rolloverOnWrite);
-        if (autoShardingEvent != null) {
+        builder.field(ROLLOVER_ON_WRITE_FIELD.getPreferredName(), backingIndices.rolloverOnWrite);
+        if (backingIndices.autoShardingEvent != null) {
             builder.startObject(AUTO_SHARDING_FIELD.getPreferredName());
-            autoShardingEvent.toXContent(builder, params);
+            backingIndices.autoShardingEvent.toXContent(builder, params);
             builder.endObject();
         }
         builder.endObject();
@@ -1170,7 +1183,6 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         if (o == null || getClass() != o.getClass()) return false;
         DataStream that = (DataStream) o;
         return name.equals(that.name)
-            && indices.equals(that.indices)
             && generation == that.generation
             && Objects.equals(metadata, that.metadata)
             && hidden == that.hidden
@@ -1180,16 +1192,14 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             && indexMode == that.indexMode
             && Objects.equals(lifecycle, that.lifecycle)
             && failureStoreEnabled == that.failureStoreEnabled
-            && failureIndices.equals(that.failureIndices)
-            && rolloverOnWrite == that.rolloverOnWrite
-            && Objects.equals(autoShardingEvent, that.autoShardingEvent);
+            && Objects.equals(backingIndices, that.backingIndices)
+            && Objects.equals(failureIndices, that.failureIndices);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(
             name,
-            indices,
             generation,
             metadata,
             hidden,
@@ -1199,9 +1209,8 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             indexMode,
             lifecycle,
             failureStoreEnabled,
-            failureIndices,
-            rolloverOnWrite,
-            autoShardingEvent
+            backingIndices,
+            failureIndices
         );
     }
 
@@ -1345,14 +1354,143 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         return new Builder(name, indices);
     }
 
+    public static Builder builder(String name, DataStreamIndices backingIndices) {
+        return new Builder(name, backingIndices);
+    }
+
     public Builder copy() {
         return new Builder(this);
     }
 
+    public static class DataStreamIndices {
+        private final String namePrefix;
+        private final List<Index> indices;
+        private final boolean rolloverOnWrite;
+        @Nullable
+        private final DataStreamAutoShardingEvent autoShardingEvent;
+        private Set<String> lookup;
+
+        protected DataStreamIndices(
+            String namePrefix,
+            List<Index> indices,
+            boolean rolloverOnWrite,
+            DataStreamAutoShardingEvent autoShardingEvent
+        ) {
+            this.namePrefix = namePrefix;
+            // The list of indices is expected to be an immutable list. We don't create an immutable copy here, as it might have
+            // impact on the performance on some usages.
+            this.indices = indices;
+            this.rolloverOnWrite = rolloverOnWrite;
+            this.autoShardingEvent = autoShardingEvent;
+
+            assert getLookup().size() == indices.size() : "found duplicate index entries in " + indices;
+        }
+
+        private Set<String> getLookup() {
+            if (lookup == null) {
+                lookup = indices.stream().map(Index::getName).collect(Collectors.toSet());
+            }
+            return lookup;
+        }
+
+        public Index getWriteIndex() {
+            return indices.get(indices.size() - 1);
+        }
+
+        public boolean containsIndex(String index) {
+            return getLookup().contains(index);
+        }
+
+        private String generateName(String dataStreamName, long generation, long epochMillis) {
+            return getDefaultIndexName(namePrefix, dataStreamName, generation, epochMillis);
+        }
+
+        public static Builder backingIndicesBuilder(List<Index> indices) {
+            return new Builder(BACKING_INDEX_PREFIX, indices);
+        }
+
+        public static Builder failureIndicesBuilder(List<Index> indices) {
+            return new Builder(FAILURE_STORE_PREFIX, indices);
+        }
+
+        public Builder copy() {
+            return new Builder(this);
+        }
+
+        public List<Index> getIndices() {
+            return indices;
+        }
+
+        public boolean isRolloverOnWrite() {
+            return rolloverOnWrite;
+        }
+
+        public DataStreamAutoShardingEvent getAutoShardingEvent() {
+            return autoShardingEvent;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            DataStreamIndices that = (DataStreamIndices) o;
+            return rolloverOnWrite == that.rolloverOnWrite
+                && Objects.equals(namePrefix, that.namePrefix)
+                && Objects.equals(indices, that.indices)
+                && Objects.equals(autoShardingEvent, that.autoShardingEvent);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(namePrefix, indices, rolloverOnWrite, autoShardingEvent);
+        }
+
+        public static class Builder {
+            private final String namePrefix;
+            private List<Index> indices;
+            private boolean rolloverOnWrite = false;
+            @Nullable
+            private DataStreamAutoShardingEvent autoShardingEvent = null;
+
+            private Builder(String namePrefix, List<Index> indices) {
+                this.namePrefix = namePrefix;
+                this.indices = indices;
+            }
+
+            private Builder(DataStreamIndices dataStreamIndices) {
+                this.namePrefix = dataStreamIndices.namePrefix;
+                this.indices = dataStreamIndices.indices;
+                this.rolloverOnWrite = dataStreamIndices.rolloverOnWrite;
+                this.autoShardingEvent = dataStreamIndices.autoShardingEvent;
+            }
+
+            /**
+             * Set the list of indices. We always create an immutable copy as that's what the constructor expects.
+             */
+            public Builder setIndices(List<Index> indices) {
+                this.indices = List.copyOf(indices);
+                return this;
+            }
+
+            public Builder setRolloverOnWrite(boolean rolloverOnWrite) {
+                this.rolloverOnWrite = rolloverOnWrite;
+                return this;
+            }
+
+            public Builder setAutoShardingEvent(DataStreamAutoShardingEvent autoShardingEvent) {
+                this.autoShardingEvent = autoShardingEvent;
+                return this;
+            }
+
+            public DataStreamIndices build() {
+                return new DataStreamIndices(namePrefix, indices, rolloverOnWrite, autoShardingEvent);
+            }
+        }
+    }
+
     public static class Builder {
         private LongSupplier timeProvider = System::currentTimeMillis;
         private String name;
-        private List<Index> indices;
         private long generation = 1;
         @Nullable
         private Map<String, Object> metadata = null;
@@ -1364,22 +1502,23 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         private IndexMode indexMode = null;
         @Nullable
         private DataStreamLifecycle lifecycle = null;
-        private boolean rolloverOnWrite = false;
         private boolean failureStoreEnabled = false;
-        private List<Index> failureIndices = List.of();
-        @Nullable
-        private DataStreamAutoShardingEvent autoShardingEvent = null;
+        private DataStreamIndices backingIndices;
+        private DataStreamIndices failureIndices = DataStreamIndices.failureIndicesBuilder(List.of()).build();
 
-        public Builder(String name, List<Index> indices) {
+        private Builder(String name, List<Index> indices) {
+            this(name, DataStreamIndices.backingIndicesBuilder(indices).build());
+        }
+
+        private Builder(String name, DataStreamIndices backingIndices) {
             this.name = name;
-            assert indices.isEmpty() == false : "Cannot create data stream with empty backing indices";
-            this.indices = indices;
+            assert backingIndices.indices.isEmpty() == false : "Cannot create data stream with empty backing indices";
+            this.backingIndices = backingIndices;
         }
 
-        public Builder(DataStream dataStream) {
+        private Builder(DataStream dataStream) {
             timeProvider = dataStream.timeProvider;
             name = dataStream.name;
-            indices = dataStream.indices;
             generation = dataStream.generation;
             metadata = dataStream.metadata;
             hidden = dataStream.hidden;
@@ -1388,10 +1527,9 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             allowCustomRouting = dataStream.allowCustomRouting;
             indexMode = dataStream.indexMode;
             lifecycle = dataStream.lifecycle;
-            rolloverOnWrite = dataStream.rolloverOnWrite;
             failureStoreEnabled = dataStream.failureStoreEnabled;
+            backingIndices = dataStream.backingIndices;
             failureIndices = dataStream.failureIndices;
-            autoShardingEvent = dataStream.autoShardingEvent;
         }
 
         public Builder setTimeProvider(LongSupplier timeProvider) {
@@ -1404,12 +1542,6 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             return this;
         }
 
-        public Builder setIndices(List<Index> indices) {
-            assert indices.isEmpty() == false : "Cannot create data stream with empty backing indices";
-            this.indices = indices;
-            return this;
-        }
-
         public Builder setGeneration(long generation) {
             this.generation = generation;
             return this;
@@ -1450,30 +1582,34 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             return this;
         }
 
-        public Builder setRolloverOnWrite(boolean rolloverOnWrite) {
-            this.rolloverOnWrite = rolloverOnWrite;
+        public Builder setFailureStoreEnabled(boolean failureStoreEnabled) {
+            this.failureStoreEnabled = failureStoreEnabled;
             return this;
         }
 
-        public Builder setFailureStoreEnabled(boolean failureStoreEnabled) {
-            this.failureStoreEnabled = failureStoreEnabled;
+        public Builder setBackingIndices(DataStreamIndices backingIndices) {
+            assert backingIndices.indices.isEmpty() == false : "Cannot create data stream with empty backing indices";
+            this.backingIndices = backingIndices;
             return this;
         }
 
-        public Builder setFailureIndices(List<Index> failureIndices) {
+        public Builder setFailureIndices(DataStreamIndices failureIndices) {
             this.failureIndices = failureIndices;
             return this;
         }
 
-        public Builder setAutoShardingEvent(DataStreamAutoShardingEvent autoShardingEvent) {
-            this.autoShardingEvent = autoShardingEvent;
+        public Builder setDataStreamIndices(boolean targetFailureStore, DataStreamIndices indices) {
+            if (targetFailureStore) {
+                setFailureIndices(indices);
+            } else {
+                setBackingIndices(indices);
+            }
             return this;
         }
 
         public DataStream build() {
             return new DataStream(
                 name,
-                indices,
                 generation,
                 metadata,
                 hidden,
@@ -1484,9 +1620,8 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
                 indexMode,
                 lifecycle,
                 failureStoreEnabled,
-                failureIndices,
-                rolloverOnWrite,
-                autoShardingEvent
+                backingIndices,
+                failureIndices
             );
         }
     }

+ 2 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java

@@ -89,7 +89,7 @@ public class DataStreamMetadata implements Metadata.Custom {
 
     public DataStreamMetadata(StreamInput in) throws IOException {
         this(
-            in.readImmutableOpenMap(StreamInput::readString, DataStream::new),
+            in.readImmutableOpenMap(StreamInput::readString, DataStream::read),
             in.readImmutableOpenMap(StreamInput::readString, DataStreamAlias::new)
         );
     }
@@ -265,7 +265,7 @@ public class DataStreamMetadata implements Metadata.Custom {
     static class DataStreamMetadataDiff implements NamedDiff<Metadata.Custom> {
 
         private static final DiffableUtils.DiffableValueReader<String, DataStream> DS_DIFF_READER = new DiffableUtils.DiffableValueReader<>(
-            DataStream::new,
+            DataStream::read,
             DataStream::readDiffFrom
         );
 

+ 3 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java

@@ -425,7 +425,7 @@ public class IndexNameExpressionResolver {
         if (shouldIncludeFailureIndices(context.getOptions(), dataStream)) {
             // We short-circuit here, if failure indices are not allowed and they can be skipped
             if (context.getOptions().allowFailureIndices() || context.getOptions().ignoreUnavailable() == false) {
-                for (Index index : dataStream.getFailureIndices()) {
+                for (Index index : dataStream.getFailureIndices().getIndices()) {
                     if (shouldTrackConcreteIndex(context, context.getOptions(), index)) {
                         concreteIndicesResult.add(index);
                     }
@@ -470,7 +470,7 @@ public class IndexNameExpressionResolver {
                 count += dataStream.getIndices().size();
             }
             if (shouldIncludeFailureIndices(context.getOptions(), dataStream)) {
-                count += dataStream.getFailureIndices().size();
+                count += dataStream.getFailureIndices().getIndices().size();
             }
             return count > 1;
         }
@@ -1431,7 +1431,7 @@ public class IndexNameExpressionResolver {
                         DataStream dataStream = (DataStream) indexAbstraction;
                         indicesStateStream = Stream.concat(
                             indicesStateStream,
-                            dataStream.getFailureIndices().stream().map(context.state.metadata()::index)
+                            dataStream.getFailureIndices().getIndices().stream().map(context.state.metadata()::index)
                         );
                     }
                     if (excludeState != null) {

+ 5 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -2600,7 +2600,10 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, Ch
                 || parent.getIndices().stream().anyMatch(index -> indexMetadata.getIndex().getName().equals(index.getName()))
                 || (DataStream.isFailureStoreFeatureFlagEnabled()
                     && parent.isFailureStoreEnabled()
-                    && parent.getFailureIndices().stream().anyMatch(index -> indexMetadata.getIndex().getName().equals(index.getName())))
+                    && parent.getFailureIndices()
+                        .getIndices()
+                        .stream()
+                        .anyMatch(index -> indexMetadata.getIndex().getName().equals(index.getName())))
                 : "Expected data stream [" + parent.getName() + "] to contain index " + indexMetadata.getIndex();
             return true;
         }
@@ -2623,7 +2626,7 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, Ch
                     indexToDataStreamLookup.put(i.getName(), dataStream);
                 }
                 if (DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.isFailureStoreEnabled()) {
-                    for (Index i : dataStream.getFailureIndices()) {
+                    for (Index i : dataStream.getFailureIndices().getIndices()) {
                         indexToDataStreamLookup.put(i.getName(), dataStream);
                     }
                 }

+ 2 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

@@ -111,8 +111,8 @@ public class MetadataCreateDataStreamService {
                     );
                     DataStream createdDataStream = clusterState.metadata().dataStreams().get(request.name);
                     firstBackingIndexRef.set(createdDataStream.getIndices().get(0).getName());
-                    if (createdDataStream.getFailureIndices().isEmpty() == false) {
-                        firstFailureStoreRef.set(createdDataStream.getFailureIndices().get(0).getName());
+                    if (createdDataStream.getFailureIndices().getIndices().isEmpty() == false) {
+                        firstFailureStoreRef.set(createdDataStream.getFailureIndices().getIndices().get(0).getName());
                     }
                     return clusterState;
                 }

+ 28 - 6
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

@@ -77,7 +77,12 @@ public class MetadataDataStreamsService {
                 ClusterState clusterState
             ) {
                 return new Tuple<>(
-                    setRolloverOnWrite(clusterState, setRolloverOnWriteTask.getDataStreamName(), setRolloverOnWriteTask.rolloverOnWrite()),
+                    setRolloverOnWrite(
+                        clusterState,
+                        setRolloverOnWriteTask.getDataStreamName(),
+                        setRolloverOnWriteTask.rolloverOnWrite(),
+                        setRolloverOnWriteTask.targetFailureStore()
+                    ),
                     setRolloverOnWriteTask
                 );
             }
@@ -152,13 +157,14 @@ public class MetadataDataStreamsService {
     public void setRolloverOnWrite(
         String dataStreamName,
         boolean rolloverOnWrite,
+        boolean targetFailureStore,
         TimeValue ackTimeout,
         TimeValue masterTimeout,
         ActionListener<AcknowledgedResponse> listener
     ) {
         setRolloverOnWriteTaskQueue.submitTask(
             "set-rollover-on-write",
-            new SetRolloverOnWriteTask(dataStreamName, rolloverOnWrite, ackTimeout, listener),
+            new SetRolloverOnWriteTask(dataStreamName, rolloverOnWrite, targetFailureStore, ackTimeout, listener),
             masterTimeout
         );
     }
@@ -230,16 +236,25 @@ public class MetadataDataStreamsService {
      * @param currentState the initial cluster state
      * @param dataStreamName the name of the data stream to be updated
      * @param rolloverOnWrite the value of the flag
+     * @param targetFailureStore whether this rollover targets the failure store or the backing indices
      * @return the updated cluster state
      */
-    public static ClusterState setRolloverOnWrite(ClusterState currentState, String dataStreamName, boolean rolloverOnWrite) {
+    public static ClusterState setRolloverOnWrite(
+        ClusterState currentState,
+        String dataStreamName,
+        boolean rolloverOnWrite,
+        boolean targetFailureStore
+    ) {
         Metadata metadata = currentState.metadata();
         var dataStream = validateDataStream(metadata, dataStreamName);
-        if (dataStream.rolloverOnWrite() == rolloverOnWrite) {
+        var indices = dataStream.getDataStreamIndices(targetFailureStore);
+        if (indices.isRolloverOnWrite() == rolloverOnWrite) {
             return currentState;
         }
         Metadata.Builder builder = Metadata.builder(metadata);
-        builder.put(dataStream.copy().setRolloverOnWrite(rolloverOnWrite).build());
+        builder.put(
+            dataStream.copy().setDataStreamIndices(targetFailureStore, indices.copy().setRolloverOnWrite(rolloverOnWrite).build()).build()
+        );
         return ClusterState.builder(currentState).metadata(builder.build()).build();
     }
 
@@ -286,7 +301,7 @@ public class MetadataDataStreamsService {
     ) {
         boolean indexNotRemoved = true;
         DataStream dataStream = validateDataStream(metadata, dataStreamName);
-        List<Index> targetIndices = failureStore ? dataStream.getFailureIndices() : dataStream.getIndices();
+        List<Index> targetIndices = failureStore ? dataStream.getFailureIndices().getIndices() : dataStream.getIndices();
         for (Index backingIndex : targetIndices) {
             if (backingIndex.getName().equals(indexName)) {
                 if (failureStore) {
@@ -365,16 +380,19 @@ public class MetadataDataStreamsService {
 
         private final String dataStreamName;
         private final boolean rolloverOnWrite;
+        private final boolean targetFailureStore;
 
         SetRolloverOnWriteTask(
             String dataStreamName,
             boolean rolloverOnWrite,
+            boolean targetFailureStore,
             TimeValue ackTimeout,
             ActionListener<AcknowledgedResponse> listener
         ) {
             super(ackTimeout, listener);
             this.dataStreamName = dataStreamName;
             this.rolloverOnWrite = rolloverOnWrite;
+            this.targetFailureStore = targetFailureStore;
         }
 
         public String getDataStreamName() {
@@ -384,5 +402,9 @@ public class MetadataDataStreamsService {
         public boolean rolloverOnWrite() {
             return rolloverOnWrite;
         }
+
+        public boolean targetFailureStore() {
+            return targetFailureStore;
+        }
     }
 }

+ 4 - 1
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -704,7 +704,10 @@ public final class RestoreService implements ClusterStateApplier {
             .stream()
             .map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex())
             .toList();
-        return dataStream.copy().setName(dataStreamName).setIndices(updatedIndices).build();
+        return dataStream.copy()
+            .setName(dataStreamName)
+            .setBackingIndices(dataStream.getBackingIndices().copy().setIndices(updatedIndices).build())
+            .build();
     }
 
     public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {

+ 10 - 5
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java

@@ -682,7 +682,9 @@ public class MetadataRolloverServiceTests extends ESTestCase {
         Metadata.Builder builder = Metadata.builder();
         builder.put("template", template);
         dataStream.getIndices().forEach(index -> builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index)));
-        dataStream.getFailureIndices().forEach(index -> builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index)));
+        dataStream.getFailureIndices()
+            .getIndices()
+            .forEach(index -> builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index)));
         builder.put(dataStream);
         final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();
         final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin();
@@ -723,15 +725,18 @@ public class MetadataRolloverServiceTests extends ESTestCase {
             assertEquals(sourceIndexName, rolloverResult.sourceIndexName());
             assertEquals(newIndexName, rolloverResult.rolloverIndexName());
             Metadata rolloverMetadata = rolloverResult.clusterState().metadata();
-            assertEquals(dataStream.getIndices().size() + dataStream.getFailureIndices().size() + 1, rolloverMetadata.indices().size());
+            assertEquals(
+                dataStream.getIndices().size() + dataStream.getFailureIndices().getIndices().size() + 1,
+                rolloverMetadata.indices().size()
+            );
             IndexMetadata rolloverIndexMetadata = rolloverMetadata.index(newIndexName);
 
             var ds = (DataStream) rolloverMetadata.getIndicesLookup().get(dataStream.getName());
             assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM));
             assertThat(ds.getIndices(), hasSize(dataStream.getIndices().size()));
-            assertThat(ds.getFailureIndices(), hasSize(dataStream.getFailureIndices().size() + 1));
-            assertThat(ds.getFailureIndices(), hasItem(rolloverMetadata.index(sourceIndexName).getIndex()));
-            assertThat(ds.getFailureIndices(), hasItem(rolloverIndexMetadata.getIndex()));
+            assertThat(ds.getFailureIndices().getIndices(), hasSize(dataStream.getFailureIndices().getIndices().size() + 1));
+            assertThat(ds.getFailureIndices().getIndices(), hasItem(rolloverMetadata.index(sourceIndexName).getIndex()));
+            assertThat(ds.getFailureIndices().getIndices(), hasItem(rolloverIndexMetadata.getIndex()));
             assertThat(ds.getFailureStoreWriteIndex(), equalTo(rolloverIndexMetadata.getIndex()));
 
             RolloverInfo info = rolloverMetadata.index(sourceIndexName).getRolloverInfos().get(dataStream.getName());

+ 4 - 3
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java

@@ -440,12 +440,13 @@ public class TransportRolloverActionTests extends ESTestCase {
 
         doAnswer(invocation -> {
             Object[] args = invocation.getArguments();
-            assert args.length == 5;
+            assert args.length == 6;
             @SuppressWarnings("unchecked")
-            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) args[4];
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) args[5];
             listener.onResponse(AcknowledgedResponse.TRUE);
             return null;
-        }).when(mockMetadataDataStreamService).setRolloverOnWrite(eq(dataStream.getName()), eq(true), any(), any(), anyActionListener());
+        }).when(mockMetadataDataStreamService)
+            .setRolloverOnWrite(eq(dataStream.getName()), eq(true), eq(false), any(), any(), anyActionListener());
 
         final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
             mock(TransportService.class),

+ 4 - 4
server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java

@@ -768,10 +768,10 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
             builder.put(indexMetadata, false);
             backingIndices.add(indexMetadata.getIndex());
         }
-        return DataStream.builder(dataStreamName, backingIndices)
-            .setGeneration(backingIndicesCount)
-            .setAutoShardingEvent(autoShardingEvent)
-            .build();
+        return DataStream.builder(
+            dataStreamName,
+            DataStream.DataStreamIndices.backingIndicesBuilder(backingIndices).setAutoShardingEvent(autoShardingEvent).build()
+        ).setGeneration(backingIndicesCount).build();
     }
 
     private IndexMetadata createIndexMetadata(

+ 87 - 55
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -73,7 +73,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
 
     @Override
     protected Writeable.Reader<DataStream> instanceReader() {
-        return DataStream::new;
+        return DataStream::read;
     }
 
     @Override
@@ -94,10 +94,12 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         var indexMode = instance.getIndexMode();
         var lifecycle = instance.getLifecycle();
         var failureStore = instance.isFailureStoreEnabled();
-        var failureIndices = instance.getFailureIndices();
+        var failureIndices = instance.getFailureIndices().getIndices();
         var rolloverOnWrite = instance.rolloverOnWrite();
         var autoShardingEvent = instance.getAutoShardingEvent();
-        switch (between(0, 12)) {
+        var failureRolloverOnWrite = instance.getFailureIndices().isRolloverOnWrite();
+        var failureAutoShardingEvent = instance.getBackingIndices().getAutoShardingEvent();
+        switch (between(0, 14)) {
             case 0 -> name = randomAlphaOfLength(10);
             case 1 -> indices = randomNonEmptyIndexInstances();
             case 2 -> generation = instance.getGeneration() + randomIntBetween(1, 10);
@@ -114,6 +116,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
                 isReplicated = isReplicated == false;
                 // Replicated data streams cannot be marked for lazy rollover.
                 rolloverOnWrite = isReplicated == false && rolloverOnWrite;
+                failureRolloverOnWrite = isReplicated == false && failureRolloverOnWrite;
             }
             case 6 -> {
                 if (isSystem == false) {
@@ -139,7 +142,27 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
                 isReplicated = rolloverOnWrite == false && isReplicated;
             }
             case 12 -> {
-                autoShardingEvent = randomBoolean() && autoShardingEvent != null
+                if (randomBoolean() || autoShardingEvent == null) {
+                    // If we're mutating the auto sharding event of the failure store, we need to ensure there's at least one failure index.
+                    if (failureIndices.isEmpty()) {
+                        failureIndices = DataStreamTestHelper.randomIndexInstances();
+                        failureStore = true;
+                    }
+                    autoShardingEvent = new DataStreamAutoShardingEvent(
+                        failureIndices.get(failureIndices.size() - 1).getName(),
+                        randomIntBetween(1, 10),
+                        randomMillisUpToYear9999()
+                    );
+                } else {
+                    autoShardingEvent = null;
+                }
+            }
+            case 13 -> {
+                failureRolloverOnWrite = failureRolloverOnWrite == false;
+                isReplicated = failureRolloverOnWrite == false && isReplicated;
+            }
+            case 14 -> {
+                failureAutoShardingEvent = randomBoolean() && failureAutoShardingEvent != null
                     ? null
                     : new DataStreamAutoShardingEvent(
                         indices.get(indices.size() - 1).getName(),
@@ -151,25 +174,29 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
 
         return new DataStream(
             name,
-            indices,
             generation,
             metadata,
             isHidden,
             isReplicated,
             isSystem,
+            System::currentTimeMillis,
             allowsCustomRouting,
             indexMode,
             lifecycle,
             failureStore,
-            failureIndices,
-            rolloverOnWrite,
-            autoShardingEvent
+            new DataStream.DataStreamIndices(DataStream.BACKING_INDEX_PREFIX, indices, rolloverOnWrite, autoShardingEvent),
+            new DataStream.DataStreamIndices(
+                DataStream.BACKING_INDEX_PREFIX,
+                failureIndices,
+                failureRolloverOnWrite,
+                failureAutoShardingEvent
+            )
         );
     }
 
     public void testRollover() {
         DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
-        Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
+        Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());
         final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
         assertThat(rolledDs.getName(), equalTo(ds.getName()));
         assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
@@ -196,7 +223,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             builder.put(im, false);
         }
 
-        final Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(builder.build());
+        final Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(builder.build(), ds.getBackingIndices());
         final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
         assertThat(rolledDs.getName(), equalTo(ds.getName()));
         assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + numConflictingIndices + 1));
@@ -212,7 +239,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             .setReplicated(false)
             .setIndexMode(randomBoolean() ? IndexMode.STANDARD : null)
             .build();
-        var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
+        var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());
 
         var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), true, null);
         assertThat(rolledDs.getName(), equalTo(ds.getName()));
@@ -225,7 +252,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
 
     public void testRolloverDowngradeToRegularDataStream() {
         DataStream ds = DataStreamTestHelper.randomInstance().copy().setReplicated(false).setIndexMode(IndexMode.TIME_SERIES).build();
-        var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
+        var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());
 
         var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
         assertThat(rolledDs.getName(), equalTo(ds.getName()));
@@ -238,18 +265,18 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
 
     public void testRolloverFailureStore() {
         DataStream ds = DataStreamTestHelper.randomInstance(true).promoteDataStream();
-        Tuple<String, Long> newCoordinates = ds.nextFailureStoreWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
+        Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getFailureIndices());
         final DataStream rolledDs = ds.rolloverFailureStore(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2());
         assertThat(rolledDs.getName(), equalTo(ds.getName()));
         assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
         assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size()));
         // Ensure that the rolloverOnWrite flag hasn't changed when rolling over a failure store.
         assertThat(rolledDs.rolloverOnWrite(), equalTo(ds.rolloverOnWrite()));
-        assertThat(rolledDs.getFailureIndices().size(), equalTo(ds.getFailureIndices().size() + 1));
+        assertThat(rolledDs.getFailureIndices().getIndices().size(), equalTo(ds.getFailureIndices().getIndices().size() + 1));
         assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
         assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex()));
-        assertTrue(rolledDs.getFailureIndices().containsAll(ds.getFailureIndices()));
-        assertTrue(rolledDs.getFailureIndices().contains(rolledDs.getFailureStoreWriteIndex()));
+        assertTrue(rolledDs.getFailureIndices().getIndices().containsAll(ds.getFailureIndices().getIndices()));
+        assertTrue(rolledDs.getFailureIndices().getIndices().contains(rolledDs.getFailureStoreWriteIndex()));
     }
 
     public void testRemoveBackingIndex() {
@@ -298,15 +325,18 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
 
     public void testRemoveFailureStoreIndex() {
         DataStream original = createRandomDataStream();
-        int indexToRemove = randomIntBetween(1, original.getFailureIndices().size() - 1);
+        int indexToRemove = randomIntBetween(1, original.getFailureIndices().getIndices().size() - 1);
 
-        DataStream updated = original.removeFailureStoreIndex(original.getFailureIndices().get(indexToRemove - 1));
+        DataStream updated = original.removeFailureStoreIndex(original.getFailureIndices().getIndices().get(indexToRemove - 1));
         assertThat(updated.getName(), equalTo(original.getName()));
         assertThat(updated.getGeneration(), equalTo(original.getGeneration() + 1));
         assertThat(updated.getIndices().size(), equalTo(original.getIndices().size()));
-        assertThat(updated.getFailureIndices().size(), equalTo(original.getFailureIndices().size() - 1));
-        for (int k = 0; k < (original.getFailureIndices().size() - 1); k++) {
-            assertThat(updated.getFailureIndices().get(k), equalTo(original.getFailureIndices().get(k < (indexToRemove - 1) ? k : k + 1)));
+        assertThat(updated.getFailureIndices().getIndices().size(), equalTo(original.getFailureIndices().getIndices().size() - 1));
+        for (int k = 0; k < (original.getFailureIndices().getIndices().size() - 1); k++) {
+            assertThat(
+                updated.getFailureIndices().getIndices().get(k),
+                equalTo(original.getFailureIndices().getIndices().get(k < (indexToRemove - 1) ? k : k + 1))
+            );
         }
     }
 
@@ -326,7 +356,9 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
 
         IllegalArgumentException e = expectThrows(
             IllegalArgumentException.class,
-            () -> original.removeFailureStoreIndex(original.getFailureIndices().get(original.getFailureIndices().size() - 1))
+            () -> original.removeFailureStoreIndex(
+                original.getFailureIndices().getIndices().get(original.getFailureIndices().getIndices().size() - 1)
+            )
         );
         assertThat(
             e.getMessage(),
@@ -334,7 +366,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
                 String.format(
                     Locale.ROOT,
                     "cannot remove backing index [%s] of data stream [%s] because it is the write index",
-                    original.getFailureIndices().get(original.getFailureIndices().size() - 1).getName(),
+                    original.getFailureIndices().getIndices().get(original.getFailureIndices().getIndices().size() - 1).getName(),
                     original.getName()
                 )
             )
@@ -379,9 +411,9 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         builder.put(ds2);
 
         createMetadataForIndices(builder, ds1.getIndices());
-        createMetadataForIndices(builder, ds1.getFailureIndices());
+        createMetadataForIndices(builder, ds1.getFailureIndices().getIndices());
         createMetadataForIndices(builder, ds2.getIndices());
-        createMetadataForIndices(builder, ds2.getFailureIndices());
+        createMetadataForIndices(builder, ds2.getFailureIndices().getIndices());
 
         Index indexToAdd = randomFrom(ds2.getIndices().toArray(Index.EMPTY_ARRAY));
 
@@ -409,11 +441,11 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         builder.put(ds2);
 
         createMetadataForIndices(builder, ds1.getIndices());
-        createMetadataForIndices(builder, ds1.getFailureIndices());
+        createMetadataForIndices(builder, ds1.getFailureIndices().getIndices());
         createMetadataForIndices(builder, ds2.getIndices());
-        createMetadataForIndices(builder, ds2.getFailureIndices());
+        createMetadataForIndices(builder, ds2.getFailureIndices().getIndices());
 
-        Index indexToAdd = randomFrom(ds2.getFailureIndices().toArray(Index.EMPTY_ARRAY));
+        Index indexToAdd = randomFrom(ds2.getFailureIndices().getIndices().toArray(Index.EMPTY_ARRAY));
 
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ds1.addBackingIndex(builder.build(), indexToAdd));
         assertThat(
@@ -498,7 +530,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         builder.put(original);
 
         createMetadataForIndices(builder, original.getIndices());
-        createMetadataForIndices(builder, original.getFailureIndices());
+        createMetadataForIndices(builder, original.getFailureIndices().getIndices());
 
         Index indexToAdd = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random()));
         builder.put(
@@ -514,11 +546,11 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         assertThat(updated.getName(), equalTo(original.getName()));
         assertThat(updated.getGeneration(), equalTo(original.getGeneration() + 1));
         assertThat(updated.getIndices().size(), equalTo(original.getIndices().size()));
-        assertThat(updated.getFailureIndices().size(), equalTo(original.getFailureIndices().size() + 1));
-        for (int k = 1; k <= original.getFailureIndices().size(); k++) {
-            assertThat(updated.getFailureIndices().get(k), equalTo(original.getFailureIndices().get(k - 1)));
+        assertThat(updated.getFailureIndices().getIndices().size(), equalTo(original.getFailureIndices().getIndices().size() + 1));
+        for (int k = 1; k <= original.getFailureIndices().getIndices().size(); k++) {
+            assertThat(updated.getFailureIndices().getIndices().get(k), equalTo(original.getFailureIndices().getIndices().get(k - 1)));
         }
-        assertThat(updated.getFailureIndices().get(0), equalTo(indexToAdd));
+        assertThat(updated.getFailureIndices().getIndices().get(0), equalTo(indexToAdd));
     }
 
     public void testAddFailureStoreIndexThatIsPartOfAnotherDataStream() {
@@ -530,11 +562,11 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         builder.put(ds2);
 
         createMetadataForIndices(builder, ds1.getIndices());
-        createMetadataForIndices(builder, ds1.getFailureIndices());
+        createMetadataForIndices(builder, ds1.getFailureIndices().getIndices());
         createMetadataForIndices(builder, ds2.getIndices());
-        createMetadataForIndices(builder, ds2.getFailureIndices());
+        createMetadataForIndices(builder, ds2.getFailureIndices().getIndices());
 
-        Index indexToAdd = randomFrom(ds2.getFailureIndices().toArray(Index.EMPTY_ARRAY));
+        Index indexToAdd = randomFrom(ds2.getFailureIndices().getIndices().toArray(Index.EMPTY_ARRAY));
 
         IllegalArgumentException e = expectThrows(
             IllegalArgumentException.class,
@@ -563,9 +595,9 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         builder.put(ds2);
 
         createMetadataForIndices(builder, ds1.getIndices());
-        createMetadataForIndices(builder, ds1.getFailureIndices());
+        createMetadataForIndices(builder, ds1.getFailureIndices().getIndices());
         createMetadataForIndices(builder, ds2.getIndices());
-        createMetadataForIndices(builder, ds2.getFailureIndices());
+        createMetadataForIndices(builder, ds2.getFailureIndices().getIndices());
 
         Index indexToAdd = randomFrom(ds2.getIndices().toArray(Index.EMPTY_ARRAY));
 
@@ -594,16 +626,16 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         builder.put(original);
 
         createMetadataForIndices(builder, original.getIndices());
-        createMetadataForIndices(builder, original.getFailureIndices());
+        createMetadataForIndices(builder, original.getFailureIndices().getIndices());
 
-        Index indexToAdd = randomFrom(original.getFailureIndices().toArray(Index.EMPTY_ARRAY));
+        Index indexToAdd = randomFrom(original.getFailureIndices().getIndices().toArray(Index.EMPTY_ARRAY));
 
         DataStream updated = original.addFailureStoreIndex(builder.build(), indexToAdd);
         assertThat(updated.getName(), equalTo(original.getName()));
         assertThat(updated.getGeneration(), equalTo(original.getGeneration()));
         assertThat(updated.getIndices().size(), equalTo(original.getIndices().size()));
-        assertThat(updated.getFailureIndices().size(), equalTo(original.getFailureIndices().size()));
-        assertThat(updated.getFailureIndices(), equalTo(original.getFailureIndices()));
+        assertThat(updated.getFailureIndices().getIndices().size(), equalTo(original.getFailureIndices().getIndices().size()));
+        assertThat(updated.getFailureIndices().getIndices(), equalTo(original.getFailureIndices().getIndices()));
     }
 
     public void testAddFailureStoreIndexWithAliases() {
@@ -613,7 +645,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         builder.put(original);
 
         createMetadataForIndices(builder, original.getIndices());
-        createMetadataForIndices(builder, original.getFailureIndices());
+        createMetadataForIndices(builder, original.getFailureIndices().getIndices());
 
         Index indexToAdd = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random()));
         IndexMetadata.Builder b = IndexMetadata.builder(indexToAdd.getName())
@@ -743,11 +775,16 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
 
         var replicated = preSnapshotDataStream.isReplicated() && randomBoolean();
         var postSnapshotDataStream = preSnapshotDataStream.copy()
-            .setIndices(postSnapshotIndices)
+            .setBackingIndices(
+                preSnapshotDataStream.getBackingIndices()
+                    .copy()
+                    .setIndices(postSnapshotIndices)
+                    .setRolloverOnWrite(replicated == false && preSnapshotDataStream.rolloverOnWrite())
+                    .build()
+            )
             .setGeneration(preSnapshotDataStream.getGeneration() + randomIntBetween(0, 5))
             .setMetadata(preSnapshotDataStream.getMetadata() == null ? null : new HashMap<>(preSnapshotDataStream.getMetadata()))
             .setReplicated(replicated)
-            .setRolloverOnWrite(replicated == false && preSnapshotDataStream.rolloverOnWrite())
             .build();
 
         var reconciledDataStream = postSnapshotDataStream.snapshot(
@@ -775,7 +812,9 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         var preSnapshotDataStream = DataStreamTestHelper.randomInstance();
         var indicesToAdd = randomNonEmptyIndexInstances();
 
-        var postSnapshotDataStream = preSnapshotDataStream.copy().setIndices(indicesToAdd).build();
+        var postSnapshotDataStream = preSnapshotDataStream.copy()
+            .setBackingIndices(preSnapshotDataStream.getBackingIndices().copy().setIndices(indicesToAdd).build())
+            .build();
 
         assertNull(postSnapshotDataStream.snapshot(preSnapshotDataStream.getIndices().stream().map(Index::getName).toList()));
     }
@@ -1769,7 +1808,6 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             isSystem,
             randomBoolean(),
             isSystem,
-            System::currentTimeMillis,
             randomBoolean(),
             randomBoolean() ? IndexMode.STANDARD : null, // IndexMode.TIME_SERIES triggers validation that many unit tests doesn't pass
             lifecycle,
@@ -1958,12 +1996,11 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             hidden,
             replicated,
             system,
-            System::currentTimeMillis,
             randomBoolean(),
             randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES,
             DataStreamLifecycleTests.randomLifecycle(),
             false,
-            null,
+            List.of(),
             replicated == false && randomBoolean(),
             null
         );
@@ -1977,7 +2014,6 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             hidden,
             replicated,
             system,
-            System::currentTimeMillis,
             randomBoolean(),
             randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES,
             DataStreamLifecycleTests.randomLifecycle(),
@@ -2003,7 +2039,6 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             hidden,
             replicated,
             system,
-            System::currentTimeMillis,
             randomBoolean(),
             randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES,
             DataStreamLifecycleTests.randomLifecycle(),
@@ -2028,12 +2063,11 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             hidden,
             replicated,
             system,
-            System::currentTimeMillis,
             randomBoolean(),
             randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES,
             DataStreamLifecycleTests.randomLifecycle(),
             false,
-            null,
+            List.of(),
             replicated == false && randomBoolean(),
             null
         );
@@ -2051,7 +2085,6 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             hidden,
             replicated,
             system,
-            System::currentTimeMillis,
             randomBoolean(),
             randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES,
             DataStreamLifecycleTests.randomLifecycle(),
@@ -2083,7 +2116,6 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             hidden,
             replicated,
             system,
-            System::currentTimeMillis,
             randomBoolean(),
             randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES,
             DataStreamLifecycleTests.randomLifecycle(),

+ 6 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java

@@ -357,7 +357,12 @@ public class MetadataDataStreamsServiceTests extends MapperServiceTestCase {
         var state = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>(dataStreamName, 2)), List.of());
         var original = state.getMetadata().dataStreams().get(dataStreamName);
         var broken = original.copy()
-            .setIndices(List.of(new Index(original.getIndices().get(0).getName(), "broken"), original.getIndices().get(1)))
+            .setBackingIndices(
+                original.getBackingIndices()
+                    .copy()
+                    .setIndices(List.of(new Index(original.getIndices().get(0).getName(), "broken"), original.getIndices().get(1)))
+                    .build()
+            )
             .build();
         var brokenState = ClusterState.builder(state).metadata(Metadata.builder(state.getMetadata()).put(broken).build()).build();
 

+ 28 - 17
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -131,13 +131,10 @@ public final class DataStreamTestHelper {
         @Nullable DataStreamLifecycle lifecycle,
         @Nullable DataStreamAutoShardingEvent autoShardingEvent
     ) {
-        return DataStream.builder(name, indices)
-            .setGeneration(generation)
-            .setMetadata(metadata)
-            .setReplicated(replicated)
-            .setLifecycle(lifecycle)
-            .setAutoShardingEvent(autoShardingEvent)
-            .build();
+        return DataStream.builder(
+            name,
+            DataStream.DataStreamIndices.backingIndicesBuilder(indices).setAutoShardingEvent(autoShardingEvent).build()
+        ).setGeneration(generation).setMetadata(metadata).setReplicated(replicated).setLifecycle(lifecycle).build();
     }
 
     public static DataStream newInstance(
@@ -155,7 +152,7 @@ public final class DataStreamTestHelper {
             .setReplicated(replicated)
             .setLifecycle(lifecycle)
             .setFailureStoreEnabled(failureStores.isEmpty() == false)
-            .setFailureIndices(failureStores)
+            .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
             .build();
     }
 
@@ -341,7 +338,6 @@ public final class DataStreamTestHelper {
         boolean replicated = randomBoolean();
         return new DataStream(
             dataStreamName,
-            indices,
             generation,
             metadata,
             randomBoolean(),
@@ -352,15 +348,30 @@ public final class DataStreamTestHelper {
             randomBoolean() ? IndexMode.STANDARD : null, // IndexMode.TIME_SERIES triggers validation that many unit tests doesn't pass
             randomBoolean() ? DataStreamLifecycle.newBuilder().dataRetention(randomMillisUpToYear9999()).build() : null,
             failureStore,
-            failureIndices,
-            replicated == false && randomBoolean(),
-            randomBoolean()
-                ? new DataStreamAutoShardingEvent(
-                    indices.get(indices.size() - 1).getName(),
-                    randomIntBetween(1, 10),
-                    randomMillisUpToYear9999()
+            DataStream.DataStreamIndices.backingIndicesBuilder(indices)
+                .setRolloverOnWrite(replicated == false && randomBoolean())
+                .setAutoShardingEvent(
+                    randomBoolean()
+                        ? new DataStreamAutoShardingEvent(
+                            indices.get(indices.size() - 1).getName(),
+                            randomIntBetween(1, 10),
+                            randomMillisUpToYear9999()
+                        )
+                        : null
                 )
-                : null
+                .build(),
+            DataStream.DataStreamIndices.failureIndicesBuilder(failureIndices)
+                .setRolloverOnWrite(failureStore && replicated == false && randomBoolean())
+                .setAutoShardingEvent(
+                    failureStore && randomBoolean()
+                        ? new DataStreamAutoShardingEvent(
+                            indices.get(indices.size() - 1).getName(),
+                            randomIntBetween(1, 10),
+                            randomMillisUpToYear9999()
+                        )
+                        : null
+                )
+                .build()
         );
     }
 

+ 4 - 1
x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java

@@ -816,7 +816,10 @@ public class ReactiveStorageDeciderService implements AutoscalingDeciderService
             Map<IndexMetadata, Long> newIndices = new HashMap<>();
             for (int i = 0; i < numberNewIndices; ++i) {
                 final String uuid = UUIDs.randomBase64UUID();
-                final Tuple<String, Long> rolledDataStreamInfo = stream.unsafeNextWriteIndexAndGeneration(state.metadata());
+                final Tuple<String, Long> rolledDataStreamInfo = stream.unsafeNextWriteIndexAndGeneration(
+                    state.metadata(),
+                    stream.getBackingIndices()
+                );
                 stream = stream.unsafeRollover(
                     new Index(rolledDataStreamInfo.v1(), uuid),
                     rolledDataStreamInfo.v2(),

+ 6 - 5
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

@@ -330,11 +330,12 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction<Pu
             // just copying the data stream is in this case safe.
             return remoteDataStream.copy()
                 .setName(localDataStreamName)
-                .setIndices(List.of(backingIndexToFollow))
+                .setBackingIndices(
+                    // Replicated data streams can't be rolled over, so having the `rolloverOnWrite` flag set to `true` wouldn't make sense
+                    // (and potentially even break things).
+                    remoteDataStream.getBackingIndices().copy().setIndices(List.of(backingIndexToFollow)).setRolloverOnWrite(false).build()
+                )
                 .setReplicated(true)
-                // Replicated data streams can't be rolled over, so having the `rolloverOnWrite` flag set to `true` wouldn't make sense
-                // (and potentially even break things).
-                .setRolloverOnWrite(false)
                 .build();
         } else {
             if (localDataStream.isReplicated() == false) {
@@ -376,7 +377,7 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction<Pu
             }
 
             return localDataStream.copy()
-                .setIndices(backingIndices)
+                .setBackingIndices(localDataStream.getBackingIndices().copy().setIndices(backingIndices).build())
                 .setGeneration(remoteDataStream.getGeneration())
                 .setMetadata(remoteDataStream.getMetadata())
                 .build();

+ 6 - 1
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamSecurityIT.java

@@ -90,7 +90,12 @@ public class DataStreamSecurityIT extends SecurityIntegTestCase {
                         ? original.getIndices().get(0).getName() + "-broken"
                         : original.getIndices().get(0).getName();
                     DataStream broken = original.copy()
-                        .setIndices(List.of(new Index(brokenIndexName, "broken"), original.getIndices().get(1)))
+                        .setBackingIndices(
+                            original.getBackingIndices()
+                                .copy()
+                                .setIndices(List.of(new Index(brokenIndexName, "broken"), original.getIndices().get(1)))
+                                .build()
+                        )
                         .build();
                     brokenDataStreamHolder.set(broken);
                     return ClusterState.builder(currentState)