Browse Source

[Failure Store] Conceptually introduce the failure store lifecycle (#125258) (#125657)

* Specify index component when retrieving lifecycle

* Add getters for the failure lifecycle

* Conceptually introduce the failure store lifecycle (even for now it's the same)

(cherry picked from commit 6503c1b94bf077a9fc152019932a79c4e4f42d40)

# Conflicts:
#	modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
#	modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java
#	modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java
#	server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java
#	server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java
#	server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java
Mary Gouseti 7 months ago
parent
commit
e088eb31b3

+ 1 - 1
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

@@ -1388,7 +1388,7 @@ public class DataStreamIT extends ESIntegTestCase {
         assertThat(metricsFooDataStream.getDataStreamStatus(), is(ClusterHealthStatus.YELLOW));
         assertThat(metricsFooDataStream.getIndexTemplate(), is("template_for_foo"));
         assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue()));
-        assertThat(dataStream.getLifecycle(), is(lifecycle.toDataStreamLifecycle()));
+        assertThat(dataStream.getDataLifecycle(), is(lifecycle.toDataStreamLifecycle()));
         assertThat(metricsFooDataStream.templatePreferIlmValue(), is(true));
         GetDataStreamAction.Response.IndexProperties indexProperties = metricsFooDataStream.getIndexSettingsValues()
             .get(dataStream.getWriteIndex());

+ 66 - 51
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

@@ -347,16 +347,22 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
         int affectedDataStreams = 0;
         for (DataStream dataStream : state.metadata().dataStreams().values()) {
             clearErrorStoreForUnmanagedIndices(dataStream);
-            if (dataStream.getLifecycle() == null) {
+            if (dataStream.getDataLifecycle() == null) {
                 continue;
             }
 
             // the following indices should not be considered for the remainder of this service run, for various reasons.
             Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();
 
-            // This is the pre-rollover write index. It may or may not be the write index after maybeExecuteRollover has executed,
-            // depending on rollover criteria, for this reason we exclude it for the remaining run.
-            indicesToExcludeForRemainingRun.addAll(maybeExecuteRollover(state, dataStream));
+            // These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
+            // depending on rollover criteria, for this reason we exclude them for the remaining run.
+            indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false));
+            if (DataStream.isFailureStoreFeatureFlagEnabled()) {
+                Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
+                if (failureStoreWriteIndex != null) {
+                    indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
+                }
+            }
 
             // tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be
             // deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes
@@ -799,23 +805,6 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
         }
     }
 
-    /**
-     * This method will attempt to roll over the write index of a data stream. The rollover will occur only if the conditions
-     * apply. In any case, we return the write backing index back to the caller, so it can be excluded from the next steps.
-     * @return the write index of this data stream before rollover was requested.
-     */
-    private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStream) {
-        Set<Index> currentRunWriteIndices = new HashSet<>();
-        currentRunWriteIndices.add(maybeExecuteRollover(state, dataStream, false));
-        if (DataStream.isFailureStoreFeatureFlagEnabled()) {
-            Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
-            if (failureStoreWriteIndex != null) {
-                currentRunWriteIndices.add(failureStoreWriteIndex);
-            }
-        }
-        return currentRunWriteIndices;
-    }
-
     @Nullable
     private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) {
         Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
@@ -824,10 +813,11 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
         }
         try {
             if (dataStream.isIndexManagedByDataStreamLifecycle(currentRunWriteIndex, state.metadata()::index)) {
+                DataStreamLifecycle lifecycle = rolloverFailureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle();
                 RolloverRequest rolloverRequest = getDefaultRolloverRequest(
                     rolloverConfiguration,
                     dataStream.getName(),
-                    dataStream.getLifecycle().getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
+                    lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
                     rolloverFailureStore
                 );
                 transportActionsDeduplicator.executeOnce(
@@ -880,41 +870,66 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
     Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
         Metadata metadata = state.metadata();
         DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get();
-        List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier, globalRetention);
-        if (backingIndicesOlderThanRetention.isEmpty()) {
+        List<Index> backingIndicesOlderThanRetention = dataStream.getBackingIndicesPastRetention(
+            metadata::index,
+            nowSupplier,
+            globalRetention
+        );
+        List<Index> failureIndicesOlderThanRetention = dataStream.getFailureIndicesPastRetention(
+            metadata::index,
+            nowSupplier,
+            globalRetention
+        );
+        if (backingIndicesOlderThanRetention.isEmpty() && failureIndicesOlderThanRetention.isEmpty()) {
             return Set.of();
         }
         Set<Index> indicesToBeRemoved = new HashSet<>();
-        // We know that there is lifecycle and retention because there are indices to be deleted
-        assert dataStream.getLifecycle() != null;
-        TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
-        for (Index index : backingIndicesOlderThanRetention) {
-            if (indicesToExcludeForRemainingRun.contains(index) == false) {
-                IndexMetadata backingIndex = metadata.index(index);
-                assert backingIndex != null : "the data stream backing indices must exist";
-
-                IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
-                // we don't want to delete the source index if they have an in-progress downsampling operation because the
-                // target downsample index will remain in the system as a standalone index
-                if (downsampleStatus == STARTED) {
-                    // there's an opportunity here to cancel downsampling and delete the source index now
-                    logger.trace(
-                        "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
-                            + "because there's a downsampling operation currently in progress for this index. Current downsampling "
-                            + "status is [{}]. When downsampling completes, DSL will delete this index.",
-                        index.getName(),
-                        effectiveDataRetention,
-                        downsampleStatus
-                    );
-                } else {
-                    // UNKNOWN is the default value, and has no real use. So index should be deleted
-                    // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
+        if (backingIndicesOlderThanRetention.isEmpty() == false) {
+            assert dataStream.getDataLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices";
+            TimeValue dataRetention = dataStream.getDataLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
+            for (Index index : backingIndicesOlderThanRetention) {
+                if (indicesToExcludeForRemainingRun.contains(index) == false) {
+                    IndexMetadata backingIndex = metadata.index(index);
+                    assert backingIndex != null : "the data stream backing indices must exist";
+
+                    IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
+                    // we don't want to delete the source index if they have an in-progress downsampling operation because the
+                    // target downsample index will remain in the system as a standalone index
+                    if (downsampleStatus == STARTED) {
+                        // there's an opportunity here to cancel downsampling and delete the source index now
+                        logger.trace(
+                            "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
+                                + "because there's a downsampling operation currently in progress for this index. Current downsampling "
+                                + "status is [{}]. When downsampling completes, DSL will delete this index.",
+                            index.getName(),
+                            dataRetention,
+                            downsampleStatus
+                        );
+                    } else {
+                        // UNKNOWN is the default value, and has no real use. So index should be deleted
+                        // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete
+                        indicesToBeRemoved.add(index);
+
+                        // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
+                        // let's start simple and reevaluate
+                        String indexName = backingIndex.getIndex().getName();
+                        deleteIndexOnce(indexName, "the lapsed [" + dataRetention + "] retention period");
+                    }
+                }
+            }
+        }
+        if (failureIndicesOlderThanRetention.isEmpty() == false) {
+            assert dataStream.getFailuresLifecycle() != null : "data stream should have failure lifecycle if we have 'old' indices";
+            var failureRetention = dataStream.getFailuresLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal());
+            for (Index index : failureIndicesOlderThanRetention) {
+                if (indicesToExcludeForRemainingRun.contains(index) == false) {
+                    IndexMetadata failureIndex = metadata.index(index);
+                    assert failureIndex != null : "the data stream failure indices must exist";
                     indicesToBeRemoved.add(index);
-
                     // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
                     // let's start simple and reevaluate
-                    String indexName = backingIndex.getIndex().getName();
-                    deleteIndexOnce(indexName, "the lapsed [" + effectiveDataRetention + "] retention period");
+                    String indexName = failureIndex.getIndex().getName();
+                    deleteIndexOnce(indexName, "the lapsed [" + failureRetention + "] retention period");
                 }
             }
         }

+ 1 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java

@@ -109,7 +109,7 @@ public class TransportExplainDataStreamLifecycleAction extends TransportMasterNo
                 idxMetadata.getCreationDate(),
                 rolloverInfo == null ? null : rolloverInfo.getTime(),
                 generationDate,
-                parentDataStream.getLifecycle(),
+                parentDataStream.getDataLifecycleForIndex(idxMetadata.getIndex()),
                 errorStore.getError(index)
             );
             explainIndices.add(explainIndexDataStreamLifecycle);

+ 1 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java

@@ -91,7 +91,7 @@ public class TransportGetDataStreamLifecycleAction extends TransportMasterNodeRe
                     .map(
                         dataStream -> new GetDataStreamLifecycleAction.Response.DataStreamLifecycle(
                             dataStream.getName(),
-                            dataStream.getLifecycle(),
+                            dataStream.getDataLifecycle(),
                             dataStream.isSystem()
                         )
                     )

+ 1 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java

@@ -76,7 +76,7 @@ public class TransportGetDataStreamLifecycleStatsAction extends TransportMasterN
         Set<String> indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices();
         List<GetDataStreamLifecycleStatsAction.Response.DataStreamStats> dataStreamStats = new ArrayList<>();
         for (DataStream dataStream : state.metadata().dataStreams().values()) {
-            if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
+            if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) {
                 int total = 0;
                 int inError = 0;
                 for (Index index : dataStream.getIndices()) {

+ 5 - 4
server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java

@@ -381,9 +381,10 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                 if (indexTemplate != null) {
                     builder.field(INDEX_TEMPLATE_FIELD.getPreferredName(), indexTemplate);
                 }
-                if (dataStream.getLifecycle() != null) {
+                if (dataStream.getDataLifecycle() != null) {
                     builder.field(LIFECYCLE_FIELD.getPreferredName());
-                    dataStream.getLifecycle().toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal());
+                    dataStream.getDataLifecycle()
+                        .toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal());
                 }
                 if (ilmPolicyName != null) {
                     builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName);
@@ -468,7 +469,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
              */
             public ManagedBy getNextGenerationManagedBy() {
                 // both ILM and DSL are configured so let's check the prefer_ilm setting to see which system takes precedence
-                if (ilmPolicyName != null && dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
+                if (ilmPolicyName != null && dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) {
                     return templatePreferIlmValue ? ManagedBy.ILM : ManagedBy.LIFECYCLE;
                 }
 
@@ -476,7 +477,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                     return ManagedBy.ILM;
                 }
 
-                if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
+                if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) {
                     return ManagedBy.LIFECYCLE;
                 }
 

+ 72 - 26
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -510,11 +510,37 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         return indexMode;
     }
 
+    /**
+     * Retrieves the lifecycle configuration meant for the backing indices.
+     */
     @Nullable
-    public DataStreamLifecycle getLifecycle() {
+    public DataStreamLifecycle getDataLifecycle() {
         return lifecycle;
     }
 
+    /**
+     * Retrieves the lifecycle configuration meant for the failure store. Currently, it's the same with {@link #getDataLifecycle()}
+     * but it will change.
+     */
+    @Nullable
+    public DataStreamLifecycle getFailuresLifecycle() {
+        return lifecycle;
+    }
+
+    /**
+     * Retrieves the correct lifecycle for the provided index. Returns null if the index does not belong to this data stream
+     */
+    @Nullable
+    public DataStreamLifecycle getDataLifecycleForIndex(Index index) {
+        if (backingIndices.containsIndex(index.getName())) {
+            return getDataLifecycle();
+        }
+        if (failureIndices.containsIndex(index.getName())) {
+            return getFailuresLifecycle();
+        }
+        return null;
+    }
+
     /**
      * Returns the latest auto sharding event that happened for this data stream
      */
@@ -928,24 +954,53 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     }
 
     /**
-     * Iterate over the backing indices and return the ones that are managed by the data stream lifecycle and past the configured
-     * retention in their lifecycle.
+     * Iterate over the backing indices and return the ones that are managed by the data stream lifecycle and past the
+     * configured retention in their lifecycle.
+     * NOTE that this specifically does not return the write index of the data stream as usually retention
+     * is treated differently for the write index (i.e. they first need to be rolled over)
+     */
+    public List<Index> getBackingIndicesPastRetention(
+        Function<String, IndexMetadata> indexMetadataSupplier,
+        LongSupplier nowSupplier,
+        DataStreamGlobalRetention globalRetention
+    ) {
+        if (getDataLifecycle() == null
+            || getDataLifecycle().enabled() == false
+            || getDataLifecycle().getEffectiveDataRetention(globalRetention, isInternal()) == null) {
+            return List.of();
+        }
+
+        List<Index> indicesPastRetention = getNonWriteIndicesOlderThan(
+            getIndices(),
+            getDataLifecycle().getEffectiveDataRetention(globalRetention, isInternal()),
+            indexMetadataSupplier,
+            this::isIndexManagedByDataStreamLifecycle,
+            nowSupplier
+        );
+        return indicesPastRetention;
+    }
+
+    /**
+     * Iterate over the failure indices and return the ones that are managed by the data stream lifecycle and past the
+     * configured retention in their lifecycle.
      * NOTE that this specifically does not return the write index of the data stream as usually retention
      * is treated differently for the write index (i.e. they first need to be rolled over)
      */
-    public List<Index> getIndicesPastRetention(
+    public List<Index> getFailureIndicesPastRetention(
         Function<String, IndexMetadata> indexMetadataSupplier,
         LongSupplier nowSupplier,
         DataStreamGlobalRetention globalRetention
     ) {
-        if (lifecycle == null
-            || lifecycle.enabled() == false
-            || lifecycle.getEffectiveDataRetention(globalRetention, isInternal()) == null) {
+        if (DataStream.isFailureStoreFeatureFlagEnabled() == false
+            || getFailuresLifecycle() == null
+            || getFailuresLifecycle().enabled() == false
+            || getFailuresLifecycle().getEffectiveDataRetention(globalRetention, isInternal()) == null) {
             return List.of();
         }
 
         List<Index> indicesPastRetention = getNonWriteIndicesOlderThan(
-            lifecycle.getEffectiveDataRetention(globalRetention, isInternal()),
+            getFailureIndices(),
+            getFailuresLifecycle().getEffectiveDataRetention(globalRetention, isInternal()),
             indexMetadataSupplier,
             this::isIndexManagedByDataStreamLifecycle,
             nowSupplier
@@ -991,37 +1046,28 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     }
 
     /**
-     * Returns the non-write backing indices and failure store indices that are older than the provided age,
+     * Filters the given <code>indices</code> that are older than the provided age and populates <code>olderIndices</code>,
      * excluding the write indices. The index age is calculated from the rollover or index creation date (or
      * the origination date if present). If an indices predicate is provided the returned list of indices will
      * be filtered according to the predicate definition. This is useful for things like "return only
-     * the backing indices that are managed by the data stream lifecycle".
+     * the indices that are managed by the data stream lifecycle".
      */
-    public List<Index> getNonWriteIndicesOlderThan(
+    private List<Index> getNonWriteIndicesOlderThan(
+        List<Index> indices,
         TimeValue retentionPeriod,
         Function<String, IndexMetadata> indexMetadataSupplier,
         @Nullable Predicate<IndexMetadata> indicesPredicate,
         LongSupplier nowSupplier
     ) {
+        if (indices.isEmpty()) {
+            return List.of();
+        }
         List<Index> olderIndices = new ArrayList<>();
-        for (Index index : backingIndices.getIndices()) {
+        for (Index index : indices) {
             if (isIndexOlderThan(index, retentionPeriod.getMillis(), nowSupplier.getAsLong(), indicesPredicate, indexMetadataSupplier)) {
                 olderIndices.add(index);
             }
         }
-        if (DataStream.isFailureStoreFeatureFlagEnabled() && failureIndices.getIndices().isEmpty() == false) {
-            for (Index index : failureIndices.getIndices()) {
-                if (isIndexOlderThan(
-                    index,
-                    retentionPeriod.getMillis(),
-                    nowSupplier.getAsLong(),
-                    indicesPredicate,
-                    indexMetadataSupplier
-                )) {
-                    olderIndices.add(index);
-                }
-            }
-        }
         return olderIndices;
     }
 
@@ -1088,7 +1134,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      */
     @Nullable
     public TimeValue getGenerationLifecycleDate(IndexMetadata indexMetadata) {
-        if (indexMetadata.getIndex().equals(getWriteIndex())) {
+        if (indexMetadata.getIndex().equals(getWriteIndex()) || indexMetadata.getIndex().equals(getWriteFailureIndex())) {
             return null;
         }
         Long originationDate = indexMetadata.getSettings().getAsLong(LIFECYCLE_ORIGINATION_DATE, null);

+ 1 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -1370,7 +1370,7 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, Ch
         }
 
         DataStream parentDataStream = indexAbstraction.getParentDataStream();
-        if (parentDataStream != null && parentDataStream.getLifecycle() != null && parentDataStream.getLifecycle().enabled()) {
+        if (parentDataStream != null && parentDataStream.getDataLifecycle() != null && parentDataStream.getDataLifecycle().enabled()) {
             // index has both ILM and data stream lifecycle configured so let's check which is preferred
             return PREFER_ILM_SETTING.get(indexMetadata.getSettings());
         }

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java

@@ -147,7 +147,7 @@ public class DataStreamLifecycleWithRetentionWarningsTests extends ESTestCase {
         ClusterState after = metadataDataStreamsService.updateDataLifecycle(before, List.of(dataStream), DataStreamLifecycle.DEFAULT);
         DataStream updatedDataStream = after.metadata().dataStreams().get(dataStream);
         assertNotNull(updatedDataStream);
-        assertThat(updatedDataStream.getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT));
+        assertThat(updatedDataStream.getDataLifecycle(), equalTo(DataStreamLifecycle.DEFAULT));
         Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
         assertThat(responseHeaders.size(), is(1));
         assertThat(

+ 309 - 200
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -49,10 +49,11 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Predicate;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
+import static org.elasticsearch.cluster.metadata.DataStream.getDefaultFailureStoreName;
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance;
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.randomGlobalRetention;
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.randomIndexInstances;
@@ -99,7 +100,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         var isSystem = instance.isSystem();
         var allowsCustomRouting = instance.isAllowCustomRouting();
         var indexMode = instance.getIndexMode();
-        var lifecycle = instance.getLifecycle();
+        var lifecycle = instance.getDataLifecycle();
         var dataStreamOptions = instance.getDataStreamOptions();
         var failureIndices = instance.getFailureIndices();
         var rolloverOnWrite = instance.rolloverOnWrite();
@@ -1218,15 +1219,30 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         {
             // for a write index that has not been rolled over yet, we get null even if the index has an origination date
             long originTimeMillis = creationTimeMillis - 3000L;
-            IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
+            IndexMetadata.Builder backingIndexMetadataBuilder = IndexMetadata.builder(
+                DataStream.getDefaultBackingIndexName(dataStreamName, 1)
+            )
                 .settings(settings(IndexVersion.current()).put(LIFECYCLE_ORIGINATION_DATE, originTimeMillis))
                 .numberOfShards(1)
                 .numberOfReplicas(1)
                 .creationDate(creationTimeMillis);
-            IndexMetadata indexMetadata = indexMetaBuilder.build();
-            DataStream dataStream = createDataStream(dataStreamName, List.of(indexMetadata.getIndex()));
+            IndexMetadata backingIndexMetadata = backingIndexMetadataBuilder.build();
+            IndexMetadata.Builder failureIndexMetadataBuilder = IndexMetadata.builder(
+                DataStream.getDefaultBackingIndexName(dataStreamName, 1)
+            )
+                .settings(settings(IndexVersion.current()).put(LIFECYCLE_ORIGINATION_DATE, originTimeMillis))
+                .numberOfShards(1)
+                .numberOfReplicas(1)
+                .creationDate(creationTimeMillis);
+            IndexMetadata failureIndexMetadata = failureIndexMetadataBuilder.build();
+            DataStream dataStream = createDataStream(
+                dataStreamName,
+                List.of(backingIndexMetadata.getIndex()),
+                List.of(failureIndexMetadata.getIndex())
+            );
 
-            assertNull(dataStream.getGenerationLifecycleDate(indexMetadata));
+            assertNull(dataStream.getGenerationLifecycleDate(backingIndexMetadata));
+            assertNull(dataStream.getGenerationLifecycleDate(failureIndexMetadata));
         }
         {
             // If the index is not the write index and has origination date set, we get the origination date even if it has not been
@@ -1306,80 +1322,159 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             .build();
     }
 
-    public void testGetIndicesOlderThan() {
+    private DataStream createDataStream(String name, List<Index> backingIndices, List<Index> failureIndices) {
+        return DataStream.builder(name, backingIndices)
+            .setMetadata(Map.of())
+            .setReplicated(randomBoolean())
+            .setAllowCustomRouting(randomBoolean())
+            .setIndexMode(IndexMode.STANDARD)
+            .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureIndices).build())
+            .build();
+    }
+
+    public void testGetBackingIndicesPastRetention() {
         String dataStreamName = "metrics-foo";
         long now = System.currentTimeMillis();
 
         List<DataStreamMetadata> creationAndRolloverTimes = List.of(
-            DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000),
-            DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000),
-            DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000),
-            DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000),
+            DataStreamMetadata.dataStreamMetadata(now - 5000_000, now - 4000_000),
+            DataStreamMetadata.dataStreamMetadata(now - 4000_000, now - 3000_000),
+            DataStreamMetadata.dataStreamMetadata(now - 3000_000, now - 2000_000),
+            DataStreamMetadata.dataStreamMetadata(now - 2000_000, now - 1000_000),
             DataStreamMetadata.dataStreamMetadata(now, null)
         );
 
+        {
+            {
+                // no lifecycle configured so we expect an empty list
+                Metadata.Builder builder = Metadata.builder();
+                DataStream dataStream = createDataStream(
+                    builder,
+                    dataStreamName,
+                    creationAndRolloverTimes,
+                    settings(IndexVersion.current()),
+                    null
+                );
+                Metadata metadata = builder.build();
+
+                assertThat(
+                    dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()).isEmpty(),
+                    is(true)
+                );
+            }
+        }
+
         Metadata.Builder builder = Metadata.builder();
+        AtomicReference<TimeValue> retention = new AtomicReference<>();
         DataStream dataStream = createDataStream(
             builder,
             dataStreamName,
             creationAndRolloverTimes,
             settings(IndexVersion.current()),
-            new DataStreamLifecycle()
+            new DataStreamLifecycle() {
+                public TimeValue dataRetention() {
+                    return retention.get();
+                }
+            }
         );
         Metadata metadata = builder.build();
+
         {
-            List<Index> backingIndices = dataStream.getNonWriteIndicesOlderThan(
-                TimeValue.timeValueMillis(2500),
+            // Mix of indices younger and older than retention, data stream retention is effective retention
+            retention.set(TimeValue.timeValueSeconds(2500));
+            List<Index> backingIndices = dataStream.getBackingIndicesPastRetention(
                 metadata::index,
-                null,
-                () -> now
+                () -> now,
+                randomBoolean() ? randomGlobalRetention() : null
             );
             assertThat(backingIndices.size(), is(2));
-            assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1)));
-            assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2)));
+            for (int i = 0; i < backingIndices.size(); i++) {
+                assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName()));
+            }
         }
 
         {
-            List<Index> backingIndices = dataStream.getNonWriteIndicesOlderThan(
-                TimeValue.timeValueMillis(0),
-                metadata::index,
-                null,
-                () -> now
-            );
+            // All indices past retention, but we keep the write index
+            retention.set(TimeValue.timeValueSeconds(0));
+            List<Index> backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, null);
             assertThat(backingIndices.size(), is(4));
-            assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1)));
-            assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2)));
-            assertThat(backingIndices.get(2).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 3)));
-            assertThat(backingIndices.get(3).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 4)));
+            for (int i = 0; i < backingIndices.size(); i++) {
+                assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName()));
+            }
         }
 
         {
-            List<Index> backingIndices = dataStream.getNonWriteIndicesOlderThan(
-                TimeValue.timeValueMillis(6000),
-                metadata::index,
-                null,
-                () -> now
-            );
+            // All indices younger than retention
+            retention.set(TimeValue.timeValueSeconds(6000));
+            List<Index> backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, null);
             assertThat(backingIndices.isEmpty(), is(true));
         }
 
         {
-            Predicate<IndexMetadata> genThreeAndFivePredicate = indexMetadata -> indexMetadata.getIndex().getName().endsWith("00003")
-                || indexMetadata.getIndex().getName().endsWith("00005");
+            // Test predicate that influences which indices are candidates for a retention check
+            Function<String, IndexMetadata> indexMetadataWithSomeLifecycleSupplier = indexName -> {
+                IndexMetadata indexMetadata = metadata.index(indexName);
+                if (indexName.endsWith("00003") || indexName.endsWith("00005")) {
+                    return indexMetadata;
+                }
+                return IndexMetadata.builder(indexMetadata)
+                    .settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.LIFECYCLE_NAME, "some-policy").build())
+                    .build();
+            };
+            retention.set(TimeValue.timeValueSeconds(0));
+            List<Index> backingIndices = dataStream.getBackingIndicesPastRetention(indexMetadataWithSomeLifecycleSupplier, () -> now, null);
+            assertThat(backingIndices.size(), is(1));
+            assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(2).getName()));
+        }
 
-            List<Index> backingIndices = dataStream.getNonWriteIndicesOlderThan(
-                TimeValue.timeValueMillis(0),
-                metadata::index,
-                genThreeAndFivePredicate,
-                () -> now
+        {
+            // no retention configured but we have default retention
+            DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(
+                TimeValue.timeValueSeconds(2500),
+                randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(2500, 5000)) : null
             );
-            assertThat(backingIndices.size(), is(1));
-            assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 3)));
+            retention.set(null);
+
+            List<Index> backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, globalRetention);
+            assertThat(backingIndices.size(), is(2));
+            for (int i = 0; i < backingIndices.size(); i++) {
+                assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName()));
+            }
+        }
+
+        {
+            // no retention or too large retention configured and we have max retention
+            DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueSeconds(2500));
+            retention.set(randomBoolean() ? TimeValue.timeValueDays(6000) : null);
+            List<Index> backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, globalRetention);
+            assertThat(backingIndices.size(), is(2));
+            for (int i = 0; i < backingIndices.size(); i++) {
+                assertThat(backingIndices.get(i).getName(), is(dataStream.getIndices().get(i).getName()));
+            }
         }
 
+        {
+            // no indices are returned as even though all pass retention age none are managed by data stream lifecycle
+            Metadata.Builder builderWithIlm = Metadata.builder();
+            DataStream dataStreamWithIlm = createDataStream(
+                builderWithIlm,
+                dataStreamName,
+                creationAndRolloverTimes,
+                settings(IndexVersion.current()).put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy"),
+                DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).build()
+            );
+            Metadata metadataWithIlm = builderWithIlm.build();
+
+            List<Index> backingIndices = dataStreamWithIlm.getBackingIndicesPastRetention(
+                metadataWithIlm::index,
+                () -> now,
+                randomGlobalRetention()
+            );
+            assertThat(backingIndices.isEmpty(), is(true));
+        }
     }
 
-    public void testGetIndicesPastRetention() {
+    public void testGetFailureIndicesPastRetention() {
         String dataStreamName = "metrics-foo";
         long now = System.currentTimeMillis();
 
@@ -1392,135 +1487,116 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         );
 
         {
-            // no lifecycle configured so we expect an empty list
-            Metadata.Builder builder = Metadata.builder();
-            DataStream dataStream = createDataStream(
-                builder,
-                dataStreamName,
-                creationAndRolloverTimes,
-                settings(IndexVersion.current()),
-                null
-            );
-            Metadata metadata = builder.build();
+            {
+                // no lifecycle configured so we expect an empty list
+                Metadata.Builder builder = Metadata.builder();
+                DataStream dataStream = createDataStream(
+                    builder,
+                    dataStreamName,
+                    creationAndRolloverTimes,
+                    settings(IndexVersion.current()),
+                    null
+                );
+                Metadata metadata = builder.build();
 
-            assertThat(dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()).isEmpty(), is(true));
+                assertThat(
+                    dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()).isEmpty(),
+                    is(true)
+                );
+            }
         }
 
+        Metadata.Builder builder = Metadata.builder();
+        AtomicReference<TimeValue> retention = new AtomicReference<>();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            creationAndRolloverTimes,
+            settings(IndexVersion.current()),
+            new DataStreamLifecycle() {
+                public TimeValue dataRetention() {
+                    return retention.get();
+                }
+            }
+        );
+        Metadata metadata = builder.build();
+
         {
-            // no retention configured but we have default retention
-            DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(
-                TimeValue.timeValueSeconds(2500),
-                randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(2500, 5000)) : null
-            );
-            Metadata.Builder builder = Metadata.builder();
-            DataStream dataStream = createDataStream(
-                builder,
-                dataStreamName,
-                creationAndRolloverTimes,
-                settings(IndexVersion.current()),
-                new DataStreamLifecycle()
+            // Mix of indices younger and older than retention, data stream retention is effective retention
+            retention.set(TimeValue.timeValueSeconds(2500));
+            List<Index> failureIndices = dataStream.getFailureIndicesPastRetention(
+                metadata::index,
+                () -> now,
+                randomBoolean() ? randomGlobalRetention() : null
             );
-            Metadata metadata = builder.build();
-
-            List<Index> backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, globalRetention);
-            assertThat(backingIndices.size(), is(2));
-            assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName()));
-            assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName()));
+            assertThat(failureIndices.size(), is(2));
+            for (int i = 0; i < failureIndices.size(); i++) {
+                assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName()));
+            }
         }
 
         {
-            // no retention configured but we have max retention
-            DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueSeconds(2500));
-            Metadata.Builder builder = Metadata.builder();
-            DataStream dataStream = createDataStream(
-                builder,
-                dataStreamName,
-                creationAndRolloverTimes,
-                settings(IndexVersion.current()),
-                new DataStreamLifecycle()
-            );
-            Metadata metadata = builder.build();
-
-            List<Index> backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, globalRetention);
-            assertThat(backingIndices.size(), is(2));
-            assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName()));
-            assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName()));
+            // All indices past retention, but we keep the write index
+            retention.set(TimeValue.timeValueSeconds(0));
+            List<Index> failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, null);
+            assertThat(failureIndices.size(), is(4));
+            for (int i = 0; i < failureIndices.size(); i++) {
+                assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName()));
+            }
         }
 
         {
-            Metadata.Builder builder = Metadata.builder();
-            DataStream dataStream = createDataStream(
-                builder,
-                dataStreamName,
-                creationAndRolloverTimes,
-                settings(IndexVersion.current()),
-                DataStreamLifecycle.builder().dataRetention(TimeValue.timeValueSeconds(2500)).build()
-            );
-            Metadata metadata = builder.build();
-
-            List<Index> backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention());
-            assertThat(backingIndices.size(), is(2));
-            assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName()));
-            assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName()));
+            // All indices younger than retention
+            retention.set(TimeValue.timeValueSeconds(6000));
+            List<Index> failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, null);
+            assertThat(failureIndices.isEmpty(), is(true));
         }
 
         {
-            // even though all indices match the write index should not be returned
-            Metadata.Builder builder = Metadata.builder();
-            DataStream dataStream = createDataStream(
-                builder,
-                dataStreamName,
-                creationAndRolloverTimes,
-                settings(IndexVersion.current()),
-                DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).build()
-            );
-            Metadata metadata = builder.build();
-
-            List<Index> backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention());
-
-            assertThat(backingIndices.size(), is(4));
-            assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName()));
-            assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName()));
-            assertThat(backingIndices.get(2).getName(), is(dataStream.getIndices().get(2).getName()));
-            assertThat(backingIndices.get(3).getName(), is(dataStream.getIndices().get(3).getName()));
+            // Test predicate that influences which indices are candidates for a retention check
+            Function<String, IndexMetadata> indexMetadataWithSomeLifecycleSupplier = indexName -> {
+                IndexMetadata indexMetadata = metadata.index(indexName);
+                if (indexName.endsWith("00003") || indexName.endsWith("00005")) {
+                    return indexMetadata;
+                }
+                return IndexMetadata.builder(indexMetadata)
+                    .settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.LIFECYCLE_NAME, "some-policy").build())
+                    .build();
+            };
+            retention.set(TimeValue.timeValueSeconds(0));
+            List<Index> failureIndices = dataStream.getFailureIndicesPastRetention(indexMetadataWithSomeLifecycleSupplier, () -> now, null);
+            assertThat(failureIndices.size(), is(1));
+            assertThat(failureIndices.get(0).getName(), is(dataStream.getFailureIndices().get(2).getName()));
         }
 
         {
-            // no index matches the retention age
-            Metadata.Builder builder = Metadata.builder();
-            DataStream dataStream = createDataStream(
-                builder,
-                dataStreamName,
-                creationAndRolloverTimes,
-                settings(IndexVersion.current()),
-                DataStreamLifecycle.builder().dataRetention(TimeValue.timeValueSeconds(6000)).build()
+            // no retention configured but we have default retention
+            DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(
+                TimeValue.timeValueSeconds(2500),
+                randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(2500, 5000)) : null
             );
-            Metadata metadata = builder.build();
+            retention.set(null);
 
-            List<Index> backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention());
-            assertThat(backingIndices.isEmpty(), is(true));
+            List<Index> failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, globalRetention);
+            assertThat(failureIndices.size(), is(2));
+            for (int i = 0; i < failureIndices.size(); i++) {
+                assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName()));
+            }
         }
 
         {
-            // no indices are returned as even though all pass retention age none are managed by data stream lifecycle
-            Metadata.Builder builder = Metadata.builder();
-            DataStream dataStream = createDataStream(
-                builder,
-                dataStreamName,
-                creationAndRolloverTimes,
-                Settings.builder()
-                    .put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy")
-                    .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()),
-                DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).build()
-            );
-            Metadata metadata = builder.build();
-
-            List<Index> backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention());
-            assertThat(backingIndices.isEmpty(), is(true));
+            // no retention or too large retention configured and we have max retention
+            DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueSeconds(2500));
+            retention.set(randomBoolean() ? TimeValue.timeValueDays(6000) : null);
+            List<Index> failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, globalRetention);
+            assertThat(failureIndices.size(), is(2));
+            for (int i = 0; i < failureIndices.size(); i++) {
+                assertThat(failureIndices.get(i).getName(), is(dataStream.getFailureIndices().get(i).getName()));
+            }
         }
     }
 
-    public void testGetIndicesPastRetentionWithOriginationDate() {
+    public void testBackingIndicesPastRetentionWithOriginationDate() {
         // First, build an ordinary data stream:
         String dataStreamName = "metrics-foo";
         long now = System.currentTimeMillis();
@@ -1550,13 +1626,13 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         {
             // no retention configured so we expect an empty list
             testRetentionReference.set(null);
-            assertThat(dataStream.getIndicesPastRetention(metadata::index, () -> now, null).isEmpty(), is(true));
+            assertThat(dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, null).isEmpty(), is(true));
         }
 
         {
-            // retention period where oldIndex is too old, but newIndex should be retained
+            // retention period where first and second index is too old, and 5th has old origination date.
             testRetentionReference.set(TimeValue.timeValueMillis(2500));
-            List<Index> backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, null);
+            List<Index> backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, null);
             assertThat(backingIndices.size(), is(3));
             assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName()));
             assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName()));
@@ -1564,24 +1640,61 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         }
 
         {
-            // even though all indices match the write index should not be returned
-            testRetentionReference.set(TimeValue.timeValueMillis(0));
-            List<Index> backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, null);
+            // no index matches the retention age
+            testRetentionReference.set(TimeValue.timeValueMillis(9000));
+            List<Index> backingIndices = dataStream.getBackingIndicesPastRetention(metadata::index, () -> now, null);
+            assertThat(backingIndices.isEmpty(), is(true));
+        }
+    }
 
-            assertThat(backingIndices.size(), is(6));
-            assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName()));
-            assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName()));
-            assertThat(backingIndices.get(2).getName(), is(dataStream.getIndices().get(2).getName()));
-            assertThat(backingIndices.get(3).getName(), is(dataStream.getIndices().get(3).getName()));
-            assertThat(backingIndices.get(4).getName(), is(dataStream.getIndices().get(4).getName()));
-            assertThat(backingIndices.get(5).getName(), is(dataStream.getIndices().get(5).getName()));
+    public void testFailureIndicesPastRetentionWithOriginationDate() {
+        // First, build an ordinary data stream:
+        String dataStreamName = "metrics-foo";
+        long now = System.currentTimeMillis();
+        List<DataStreamMetadata> creationAndRolloverTimes = List.of(
+            DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000),
+            DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000),
+            DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000),
+            DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000),
+            DataStreamMetadata.dataStreamMetadata(now, null, now - 8000), // origination date older than retention
+            DataStreamMetadata.dataStreamMetadata(now, null, now - 1000), // origination date within retention
+            DataStreamMetadata.dataStreamMetadata(now, null)
+        );
+        Metadata.Builder metadataBuilder = Metadata.builder();
+        AtomicReference<TimeValue> testRetentionReference = new AtomicReference<>(null);
+        DataStream dataStream = createDataStream(
+            metadataBuilder,
+            dataStreamName,
+            creationAndRolloverTimes,
+            settings(IndexVersion.current()),
+            new DataStreamLifecycle() {
+                public TimeValue dataRetention() {
+                    return testRetentionReference.get();
+                }
+            }
+        );
+        Metadata metadata = metadataBuilder.build();
+        {
+            // no retention configured so we expect an empty list
+            testRetentionReference.set(null);
+            assertThat(dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, null).isEmpty(), is(true));
+        }
+
+        {
+            // retention period where first and second index is too old, and 5th has old origination date.
+            testRetentionReference.set(TimeValue.timeValueMillis(2500));
+            List<Index> failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, null);
+            assertThat(failureIndices.size(), is(3));
+            assertThat(failureIndices.get(0).getName(), is(dataStream.getFailureIndices().get(0).getName()));
+            assertThat(failureIndices.get(1).getName(), is(dataStream.getFailureIndices().get(1).getName()));
+            assertThat(failureIndices.get(2).getName(), is(dataStream.getFailureIndices().get(5).getName()));
         }
 
         {
             // no index matches the retention age
             testRetentionReference.set(TimeValue.timeValueMillis(9000));
-            List<Index> backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, null);
-            assertThat(backingIndices.isEmpty(), is(true));
+            List<Index> failureIndices = dataStream.getFailureIndicesPastRetention(metadata::index, () -> now, null);
+            assertThat(failureIndices.isEmpty(), is(true));
         }
     }
 
@@ -1842,55 +1955,48 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         }
     }
 
-    public void testGetIndicesOlderThanWithOriginationDate() {
-        // First, build an ordinary datastream:
-        String dataStreamName = "metrics-foo";
-        long now = System.currentTimeMillis();
-        List<DataStreamMetadata> creationAndRolloverTimes = List.of(
-            DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000),
-            DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000),
-            DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000),
-            DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000),
-            DataStreamMetadata.dataStreamMetadata(now, null, now - 7000), // origination date older than retention
-            DataStreamMetadata.dataStreamMetadata(now, null, now - 1000), // origination date within retention
-            DataStreamMetadata.dataStreamMetadata(now, null, now - 7000) // write index origination date older than retention
-        );
-        Metadata.Builder builder = Metadata.builder();
-        DataStream dataStream = createDataStream(
+    private DataStream createDataStream(
+        Metadata.Builder builder,
+        String dataStreamName,
+        List<DataStreamMetadata> creationAndRolloverTimes,
+        Settings.Builder backingIndicesSettings,
+        @Nullable DataStreamLifecycle lifecycle
+    ) {
+        int backingIndicesCount = creationAndRolloverTimes.size();
+        final List<Index> backingIndices = createDataStreamIndices(
             builder,
             dataStreamName,
             creationAndRolloverTimes,
-            settings(IndexVersion.current()),
-            new DataStreamLifecycle()
+            backingIndicesSettings,
+            backingIndicesCount,
+            false
         );
-        Metadata metadata = builder.build();
-
-        List<Index> backingIndices = dataStream.getNonWriteIndicesOlderThan(
-            TimeValue.timeValueMillis(2500),
-            metadata::index,
-            null,
-            () -> now
+        final List<Index> failureIndices = createDataStreamIndices(
+            builder,
+            dataStreamName,
+            creationAndRolloverTimes,
+            backingIndicesSettings,
+            backingIndicesCount,
+            true
         );
-        // We expect to see the index with the really old origination date, but not the one with the more recent origination date (and
-        // not the write index)
-        assertThat(backingIndices.size(), is(3));
-        assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1)));
-        assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2)));
-        assertThat(backingIndices.get(2).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 6)));
+        return newInstance(dataStreamName, backingIndices, backingIndicesCount, null, false, lifecycle, failureIndices);
     }
 
-    private DataStream createDataStream(
+    private static List<Index> createDataStreamIndices(
         Metadata.Builder builder,
         String dataStreamName,
         List<DataStreamMetadata> creationAndRolloverTimes,
         Settings.Builder backingIndicesSettings,
-        @Nullable DataStreamLifecycle lifecycle
+        int backingIndicesCount,
+        boolean isFailureStore
     ) {
-        int backingIndicesCount = creationAndRolloverTimes.size();
-        final List<Index> backingIndices = new ArrayList<>();
+        List<Index> indices = new ArrayList<>(backingIndicesCount);
         for (int k = 1; k <= backingIndicesCount; k++) {
             DataStreamMetadata creationRolloverTime = creationAndRolloverTimes.get(k - 1);
-            IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k))
+            String indexName = isFailureStore
+                ? getDefaultFailureStoreName(dataStreamName, k, System.currentTimeMillis())
+                : DataStream.getDefaultBackingIndexName(dataStreamName, k);
+            IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(indexName)
                 .settings(backingIndicesSettings)
                 .numberOfShards(1)
                 .numberOfReplicas(1)
@@ -1906,12 +2012,15 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             Long originationTimeInMillis = creationRolloverTime.originationTimeInMillis;
             if (originationTimeInMillis != null) {
                 backingIndicesSettings.put(LIFECYCLE_ORIGINATION_DATE, originationTimeInMillis);
+            } else {
+                // We reuse the backingIndicesSettings, so it's important to reset it
+                backingIndicesSettings.putNull(LIFECYCLE_ORIGINATION_DATE);
             }
             IndexMetadata indexMetadata = indexMetaBuilder.build();
             builder.put(indexMetadata, false);
-            backingIndices.add(indexMetadata.getIndex());
+            indices.add(indexMetadata.getIndex());
         }
-        return newInstance(dataStreamName, backingIndices, backingIndicesCount, null, false, lifecycle);
+        return indices;
     }
 
     public void testXContentSerializationWithRolloverAndEffectiveRetention() throws IOException {

+ 2 - 2
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java

@@ -77,7 +77,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
         assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(false));
         assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false));
         assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false));
-        assertThat(newState.metadata().dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT));
+        assertThat(newState.metadata().dataStreams().get(dataStreamName).getDataLifecycle(), equalTo(DataStreamLifecycle.DEFAULT));
         assertThat(newState.metadata().dataStreams().get(dataStreamName).getIndexMode(), nullValue());
         assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue());
         assertThat(
@@ -114,7 +114,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
         assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false));
         assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false));
         assertThat(newState.metadata().dataStreams().get(dataStreamName).getIndexMode(), equalTo(IndexMode.LOGSDB));
-        assertThat(newState.metadata().dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT));
+        assertThat(newState.metadata().dataStreams().get(dataStreamName).getDataLifecycle(), equalTo(DataStreamLifecycle.DEFAULT));
         assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue());
         assertThat(
             newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"),

+ 2 - 2
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java

@@ -478,7 +478,7 @@ public class MetadataDataStreamsServiceTests extends MapperServiceTestCase {
             ClusterState after = service.updateDataLifecycle(before, List.of(dataStream), null);
             DataStream updatedDataStream = after.metadata().dataStreams().get(dataStream);
             assertNotNull(updatedDataStream);
-            assertThat(updatedDataStream.getLifecycle(), nullValue());
+            assertThat(updatedDataStream.getDataLifecycle(), nullValue());
             before = after;
         }
 
@@ -487,7 +487,7 @@ public class MetadataDataStreamsServiceTests extends MapperServiceTestCase {
             ClusterState after = service.updateDataLifecycle(before, List.of(dataStream), lifecycle);
             DataStream updatedDataStream = after.metadata().dataStreams().get(dataStream);
             assertNotNull(updatedDataStream);
-            assertThat(updatedDataStream.getLifecycle(), equalTo(lifecycle));
+            assertThat(updatedDataStream.getDataLifecycle(), equalTo(lifecycle));
         }
     }
 

+ 4 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java

@@ -80,14 +80,14 @@ public class DataStreamLifecycleUsageTransportAction extends XPackUsageFeatureTr
         LongSummaryStatistics effectiveRetentionStats = new LongSummaryStatistics();
 
         for (DataStream dataStream : dataStreams) {
-            if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
+            if (dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled()) {
                 dataStreamsWithLifecycles++;
                 // Track data retention
-                if (dataStream.getLifecycle().dataRetention() != null) {
-                    dataRetentionStats.accept(dataStream.getLifecycle().dataRetention().getMillis());
+                if (dataStream.getDataLifecycle().dataRetention() != null) {
+                    dataRetentionStats.accept(dataStream.getDataLifecycle().dataRetention().getMillis());
                 }
                 // Track effective retention
-                Tuple<TimeValue, DataStreamLifecycle.RetentionSource> effectiveDataRetentionWithSource = dataStream.getLifecycle()
+                Tuple<TimeValue, DataStreamLifecycle.RetentionSource> effectiveDataRetentionWithSource = dataStream.getDataLifecycle()
                     .getEffectiveDataRetentionWithSource(globalRetention, dataStream.isInternal());
 
                 // Track global retention usage