Browse Source

[8.19] [Failure store] Introduce default retention for failure indices (#127573) (#127673)

* [Failure store] Introduce default retention for failure indices (#127573)

We introduce a new global retention setting `data_streams.lifecycle.retention.failures_default` which is used by the data stream lifecycle management as the default retention when the failure store lifecycle of the data stream does not specify one.

Elasticsearch comes with the default value of 30 days. The value can be changed via the settings API to any time value higher than 10 seconds or -1 to indicate no default retention should apply.

The failures default retention can be set to values higher than the max retention, but then the max retention will be effective. The reason for this choice it to ensure that no deployments will be broken, if the user has already set up max retention less than 30 days.
Mary Gouseti 5 months ago
parent
commit
700503a4e1
25 changed files with 439 additions and 101 deletions
  1. 5 0
      docs/changelog/127573.yaml
  2. 2 1
      modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java
  3. 7 2
      modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java
  4. 3 2
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java
  5. 31 13
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
  6. 2 1
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java
  7. 2 1
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java
  8. 1 1
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java
  9. 7 5
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java
  10. 24 4
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java
  11. 16 0
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml
  12. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  13. 50 23
      server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java
  14. 43 13
      server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleAction.java
  15. 11 0
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java
  16. 94 8
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java
  17. 6 2
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java
  18. 2 2
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java
  19. 27 6
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java
  20. 1 0
      server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
  21. 1 1
      server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java
  22. 21 9
      server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleResponseTests.java
  23. 78 4
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettingsTests.java
  24. 3 2
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java
  25. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamUsageTransportAction.java

+ 5 - 0
docs/changelog/127573.yaml

@@ -0,0 +1,5 @@
+pr: 127573
+summary: "[Failure store] Introduce default retention for failure indices"
+area: Data streams
+type: enhancement
+issues: []

+ 2 - 1
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

@@ -279,7 +279,8 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
                                 builder,
                                 withEffectiveRetention,
                                 getDataStreamResponse.getRolloverConfiguration(),
-                                getDataStreamResponse.getGlobalRetention()
+                                getDataStreamResponse.getDataGlobalRetention(),
+                                getDataStreamResponse.getFailuresGlobalRetention()
                             );
                         String serialized = Strings.toString(builder);
                         Map<String, Object> resultMap = XContentHelper.convertToMap(

+ 7 - 2
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java

@@ -147,7 +147,12 @@ public class DataStreamGlobalRetentionIT extends DisabledSecurityDataStreamTestC
     @SuppressWarnings("unchecked")
     public void testDefaultRetention() throws Exception {
         // Set default global retention
-        updateClusterSettings(Settings.builder().put("data_streams.lifecycle.retention.default", "10s").build());
+        updateClusterSettings(
+            Settings.builder()
+                .put("data_streams.lifecycle.retention.default", "10s")
+                .put("data_streams.lifecycle.retention.failures_default", "10s")
+                .build()
+        );
 
         // Verify that the effective retention matches the default retention
         {
@@ -163,7 +168,7 @@ public class DataStreamGlobalRetentionIT extends DisabledSecurityDataStreamTestC
             assertThat(lifecycle.get("data_retention"), nullValue());
             Map<String, Object> failuresLifecycle = ((Map<String, Map<String, Object>>) dataStream.get("failure_store")).get("lifecycle");
             assertThat(failuresLifecycle.get("effective_retention"), is("10s"));
-            assertThat(failuresLifecycle.get("retention_determined_by"), is("default_global_retention"));
+            assertThat(failuresLifecycle.get("retention_determined_by"), is("default_failures_retention"));
             assertThat(failuresLifecycle.get("data_retention"), nullValue());
         }
 

+ 3 - 2
modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java

@@ -88,7 +88,7 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadAction
             threadPool,
             actionFilters,
             GetDataStreamAction.Request::new,
-            GetDataStreamAction.Response::new,
+            GetDataStreamAction.Response::read,
             transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
         );
         this.indexNameExpressionResolver = indexNameExpressionResolver;
@@ -287,7 +287,8 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadAction
         return new GetDataStreamAction.Response(
             dataStreamInfos,
             request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null,
-            globalRetentionSettings.get()
+            globalRetentionSettings.get(false),
+            globalRetentionSettings.get(true)
         );
     }
 

+ 31 - 13
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

@@ -44,7 +44,6 @@ import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.SimpleBatchedExecutor;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.DataStream;
-import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
 import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
 import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
@@ -354,13 +353,18 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
                 continue;
             }
 
+            // Retrieve the effective retention to ensure the same retention is used for this data stream
+            // through all operations.
+            var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false);
+            var failuresRetention = getEffectiveRetention(dataStream, globalRetentionSettings, true);
+
             // the following indices should not be considered for the remainder of this service run, for various reasons.
             Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();
 
             // These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
             // depending on rollover criteria, for this reason we exclude them for the remaining run.
-            indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false));
-            Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
+            indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, dataRetention, false));
+            Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, failuresRetention, true);
             if (failureStoreWriteIndex != null) {
                 indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
             }
@@ -376,7 +380,9 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
             );
 
             try {
-                indicesToExcludeForRemainingRun.addAll(maybeExecuteRetention(state, dataStream, indicesToExcludeForRemainingRun));
+                indicesToExcludeForRemainingRun.addAll(
+                    maybeExecuteRetention(state, dataStream, dataRetention, failuresRetention, indicesToExcludeForRemainingRun)
+                );
             } catch (Exception e) {
                 // individual index errors would be reported via the API action listener for every delete call
                 // we could potentially record errors at a data stream level and expose it via the _data_stream API?
@@ -807,7 +813,12 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
     }
 
     @Nullable
-    private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) {
+    private Index maybeExecuteRollover(
+        ClusterState state,
+        DataStream dataStream,
+        TimeValue effectiveRetention,
+        boolean rolloverFailureStore
+    ) {
         Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
         if (currentRunWriteIndex == null) {
             return null;
@@ -818,7 +829,7 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
                 RolloverRequest rolloverRequest = getDefaultRolloverRequest(
                     rolloverConfiguration,
                     dataStream.getName(),
-                    lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
+                    effectiveRetention,
                     rolloverFailureStore
                 );
                 transportActionsDeduplicator.executeOnce(
@@ -868,14 +879,17 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
      * @param indicesToExcludeForRemainingRun Indices to exclude from retention even if it would be time for them to be deleted
      * @return The set of indices that delete requests have been sent for
      */
-    Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
-        Metadata metadata = state.metadata();
-        DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get();
-        var dataRetention = getRetention(dataStream, globalRetention, false);
-        var failureRetention = getRetention(dataStream, globalRetention, true);
+    Set<Index> maybeExecuteRetention(
+        ClusterState state,
+        DataStream dataStream,
+        TimeValue dataRetention,
+        TimeValue failureRetention,
+        Set<Index> indicesToExcludeForRemainingRun
+    ) {
         if (dataRetention == null && failureRetention == null) {
             return Set.of();
         }
+        Metadata metadata = state.metadata();
         List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(
             metadata::index,
             nowSupplier,
@@ -1320,11 +1334,15 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
     }
 
     @Nullable
-    private static TimeValue getRetention(DataStream dataStream, DataStreamGlobalRetention globalRetention, boolean failureStore) {
+    private static TimeValue getEffectiveRetention(
+        DataStream dataStream,
+        DataStreamGlobalRetentionSettings globalRetentionSettings,
+        boolean failureStore
+    ) {
         DataStreamLifecycle lifecycle = failureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle();
         return lifecycle == null || lifecycle.enabled() == false
             ? null
-            : lifecycle.getEffectiveDataRetention(globalRetention, dataStream.isInternal());
+            : lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(failureStore), dataStream.isInternal());
     }
 
     /**

+ 2 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java

@@ -120,7 +120,8 @@ public class TransportExplainDataStreamLifecycleAction extends TransportMasterNo
             new ExplainDataStreamLifecycleAction.Response(
                 explainIndices,
                 request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null,
-                globalRetentionSettings.get()
+                globalRetentionSettings.get(false),
+                globalRetentionSettings.get(true)
             )
         );
     }

+ 2 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java

@@ -47,7 +47,8 @@ public class RestGetDataStreamsAction extends BaseRestHandler {
     public static final String FAILURES_LIFECYCLE_API_CAPABILITY = "failure_store.lifecycle";
     private static final Set<String> CAPABILITIES = Set.of(
         DataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY,
-        FAILURES_LIFECYCLE_API_CAPABILITY
+        FAILURES_LIFECYCLE_API_CAPABILITY,
+        "failure_store.lifecycle.default_retention"
     );
 
     @Override

+ 1 - 1
modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java

@@ -45,7 +45,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
 
     @Override
     protected Writeable.Reader<Response> instanceReader() {
-        return Response::new;
+        return Response::read;
     }
 
     @Override

+ 7 - 5
modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java

@@ -351,8 +351,8 @@ public class TransportGetDataStreamsActionTests extends ESTestCase {
             emptyDataStreamFailureStoreSettings,
             null
         );
-        assertThat(response.getGlobalRetention(), nullValue());
-        DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(
+        assertThat(response.getDataGlobalRetention(), nullValue());
+        DataStreamGlobalRetention dataGlobalRetention = new DataStreamGlobalRetention(
             TimeValue.timeValueDays(randomIntBetween(1, 5)),
             TimeValue.timeValueDays(randomIntBetween(5, 10))
         );
@@ -361,9 +361,9 @@ public class TransportGetDataStreamsActionTests extends ESTestCase {
                 Settings.builder()
                     .put(
                         DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING.getKey(),
-                        globalRetention.defaultRetention()
+                        dataGlobalRetention.defaultRetention()
                     )
-                    .put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), globalRetention.maxRetention())
+                    .put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), dataGlobalRetention.maxRetention())
                     .build()
             )
         );
@@ -377,7 +377,9 @@ public class TransportGetDataStreamsActionTests extends ESTestCase {
             emptyDataStreamFailureStoreSettings,
             null
         );
-        assertThat(response.getGlobalRetention(), equalTo(globalRetention));
+        assertThat(response.getDataGlobalRetention(), equalTo(dataGlobalRetention));
+        // We used the default failures retention here which is greater than the max
+        assertThat(response.getFailuresGlobalRetention(), equalTo(new DataStreamGlobalRetention(null, dataGlobalRetention.maxRetention())));
     }
 
     public void testDataStreamIsFailureStoreEffectivelyEnabled_disabled() {

+ 24 - 4
modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

@@ -1566,9 +1566,16 @@ public class DataStreamLifecycleServiceTests extends ESTestCase {
         ClusterState state = downsampleSetup(dataStreamName, SUCCESS);
         DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
         String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+        TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
 
         // Executing the method to be tested:
-        Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of());
+        Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
+            clusterService.state(),
+            dataStream,
+            dataRetention,
+            null,
+            Set.of()
+        );
         assertThat(indicesToBeRemoved, contains(state.getMetadata().index(firstGenIndexName).getIndex()));
     }
 
@@ -1576,10 +1583,16 @@ public class DataStreamLifecycleServiceTests extends ESTestCase {
         String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
         ClusterState state = downsampleSetup(dataStreamName, STARTED);
         DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
-        String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+        TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
 
         // Executing the method to be tested:
-        Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of());
+        Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
+            clusterService.state(),
+            dataStream,
+            dataRetention,
+            null,
+            Set.of()
+        );
         assertThat(indicesToBeRemoved, empty());
     }
 
@@ -1588,9 +1601,16 @@ public class DataStreamLifecycleServiceTests extends ESTestCase {
         ClusterState state = downsampleSetup(dataStreamName, UNKNOWN);
         DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
         String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+        TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();
 
         // Executing the method to be tested:
-        Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of());
+        Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
+            clusterService.state(),
+            dataStream,
+            dataRetention,
+            null,
+            Set.of()
+        );
         assertThat(indicesToBeRemoved, contains(state.getMetadata().index(firstGenIndexName).getIndex()));
     }
 

+ 16 - 0
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml

@@ -189,6 +189,9 @@ teardown:
   - match: { data_streams.0.template: 'my-template1' }
   - match: { data_streams.0.failure_store.enabled: true }
   - match: { data_streams.0.failure_store.lifecycle.enabled: false }
+  - is_false: data_streams.0.failure_store.lifecycle.data_retention
+  - is_false: data_streams.0.failure_store.lifecycle.effective_retention
+  - is_false: data_streams.0.failure_store.lifecycle.retention_determined_by
   - length: { data_streams.0.failure_store.indices: 1 }
   - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-fs-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
   - is_false: data_streams.0.failure_store.indices.0.prefer_ilm
@@ -196,6 +199,13 @@ teardown:
 
 ---
 "Get failure store info from cluster setting enabled failure store":
+  - requires:
+      test_runner_features: [ capabilities ]
+      reason: "Default retention for failures was added in 8.19+"
+      capabilities:
+        - method: GET
+          path: /_data_stream/{target}
+          capabilities: [ 'failure_store.lifecycle.default_retention' ]
   - do:
       indices.create_data_stream:
         name: fs-default-data-stream
@@ -212,6 +222,9 @@ teardown:
   - match: { data_streams.0.template: 'my-template2' }
   - match: { data_streams.0.failure_store.enabled: true }
   - match: { data_streams.0.failure_store.lifecycle.enabled: true }
+  - is_false: data_streams.0.failure_store.lifecycle.data_retention
+  - match: { data_streams.0.failure_store.lifecycle.effective_retention: '30d' }
+  - match: { data_streams.0.failure_store.lifecycle.retention_determined_by: 'default_failures_retention' }
   - match: { data_streams.0.failure_store.indices: [] }
 
   # Initialize failure store
@@ -234,6 +247,9 @@ teardown:
   - match: { data_streams.0.template: 'my-template2' }
   - match: { data_streams.0.failure_store.enabled: true }
   - match: { data_streams.0.failure_store.lifecycle.enabled: true }
+  - is_false: data_streams.0.failure_store.lifecycle.data_retention
+  - match: { data_streams.0.failure_store.lifecycle.effective_retention: '30d' }
+  - match: { data_streams.0.failure_store.lifecycle.retention_determined_by: 'default_failures_retention' }
   - length: { data_streams.0.failure_store.indices: 1 }
   - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-fs-default-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
   - is_false: data_streams.0.failure_store.indices.0.prefer_ilm

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -212,6 +212,7 @@ public class TransportVersions {
     public static final TransportVersion PINNED_RETRIEVER_8_19 = def(8_841_0_23);
     public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_BLOCK_8_19 = def(8_841_0_24);
     public static final TransportVersion INTRODUCE_FAILURES_LIFECYCLE_BACKPORT_8_19 = def(8_841_0_25);
+    public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19 = def(8_841_0_26);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 50 - 23
server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java

@@ -352,7 +352,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
 
             @Override
             public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-                return toXContent(builder, params, null, null);
+                return toXContent(builder, params, null, null, null);
             }
 
             /**
@@ -363,7 +363,8 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                 XContentBuilder builder,
                 Params params,
                 @Nullable RolloverConfiguration rolloverConfiguration,
-                @Nullable DataStreamGlobalRetention globalRetention
+                @Nullable DataStreamGlobalRetention dataGlobalRetention,
+                @Nullable DataStreamGlobalRetention failureGlobalRetention
             ) throws IOException {
                 builder.startObject();
                 builder.field(DataStream.NAME_FIELD.getPreferredName(), dataStream.getName());
@@ -384,7 +385,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                 if (dataStream.getDataLifecycle() != null) {
                     builder.field(LIFECYCLE_FIELD.getPreferredName());
                     dataStream.getDataLifecycle()
-                        .toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal());
+                        .toXContent(builder, params, rolloverConfiguration, dataGlobalRetention, dataStream.isInternal());
                 }
                 if (ilmPolicyName != null) {
                     builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName);
@@ -423,7 +424,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                 DataStreamLifecycle failuresLifecycle = dataStream.getFailuresLifecycle(failureStoreEffectivelyEnabled);
                 if (failuresLifecycle != null) {
                     builder.field(LIFECYCLE_FIELD.getPreferredName());
-                    failuresLifecycle.toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal());
+                    failuresLifecycle.toXContent(builder, params, rolloverConfiguration, failureGlobalRetention, dataStream.isInternal());
                 }
                 builder.endObject();
                 builder.endObject();
@@ -582,30 +583,44 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
         @Nullable
         private final RolloverConfiguration rolloverConfiguration;
         @Nullable
-        private final DataStreamGlobalRetention globalRetention;
+        private final DataStreamGlobalRetention dataGlobalRetention;
+        @Nullable
+        private final DataStreamGlobalRetention failuresGlobalRetention;
 
         public Response(List<DataStreamInfo> dataStreams) {
-            this(dataStreams, null, null);
+            this(dataStreams, null, null, null);
         }
 
         public Response(
             List<DataStreamInfo> dataStreams,
             @Nullable RolloverConfiguration rolloverConfiguration,
-            @Nullable DataStreamGlobalRetention globalRetention
+            @Nullable DataStreamGlobalRetention dataGlobalRetention,
+            @Nullable DataStreamGlobalRetention failuresGlobalRetention
         ) {
             this.dataStreams = dataStreams;
             this.rolloverConfiguration = rolloverConfiguration;
-            this.globalRetention = globalRetention;
+            this.dataGlobalRetention = dataGlobalRetention;
+            this.failuresGlobalRetention = failuresGlobalRetention;
         }
 
-        public Response(StreamInput in) throws IOException {
-            this(
-                in.readCollectionAsList(DataStreamInfo::new),
-                in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X) ? in.readOptionalWriteable(RolloverConfiguration::new) : null,
-                in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)
-                    ? in.readOptionalWriteable(DataStreamGlobalRetention::read)
-                    : null
-            );
+        public static Response read(StreamInput in) throws IOException {
+            var dataStreamInfo = in.readCollectionAsList(DataStreamInfo::new);
+            var rolloverConfiguration = in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)
+                ? in.readOptionalWriteable(RolloverConfiguration::new)
+                : null;
+            DataStreamGlobalRetention dataGlobalRetention = null;
+            DataStreamGlobalRetention failuresGlobalRetention = null;
+            if (in.getTransportVersion().onOrAfter(TransportVersions.INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19)) {
+                var defaultRetention = in.readOptionalTimeValue();
+                var maxRetention = in.readOptionalTimeValue();
+                var failuresDefaultRetention = in.readOptionalTimeValue();
+                dataGlobalRetention = DataStreamGlobalRetention.create(defaultRetention, maxRetention);
+                failuresGlobalRetention = DataStreamGlobalRetention.create(failuresDefaultRetention, maxRetention);
+            } else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
+                dataGlobalRetention = in.readOptionalWriteable(DataStreamGlobalRetention::read);
+                failuresGlobalRetention = dataGlobalRetention;
+            }
+            return new Response(dataStreamInfo, rolloverConfiguration, dataGlobalRetention, failuresGlobalRetention);
         }
 
         public List<DataStreamInfo> getDataStreams() {
@@ -618,8 +633,13 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
         }
 
         @Nullable
-        public DataStreamGlobalRetention getGlobalRetention() {
-            return globalRetention;
+        public DataStreamGlobalRetention getDataGlobalRetention() {
+            return dataGlobalRetention;
+        }
+
+        @Nullable
+        public DataStreamGlobalRetention getFailuresGlobalRetention() {
+            return failuresGlobalRetention;
         }
 
         @Override
@@ -628,8 +648,13 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
             if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
                 out.writeOptionalWriteable(rolloverConfiguration);
             }
-            if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
-                out.writeOptionalWriteable(globalRetention);
+            // A version 9.x cluster will never read this, so we only need to include the patch version here.
+            if (out.getTransportVersion().isPatchFrom(TransportVersions.INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19)) {
+                out.writeOptionalTimeValue(dataGlobalRetention == null ? null : dataGlobalRetention.defaultRetention());
+                out.writeOptionalTimeValue(dataGlobalRetention == null ? null : dataGlobalRetention.maxRetention());
+                out.writeOptionalTimeValue(failuresGlobalRetention == null ? null : failuresGlobalRetention.defaultRetention());
+            } else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
+                out.writeOptionalWriteable(dataGlobalRetention);
             }
         }
 
@@ -642,7 +667,8 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                     builder,
                     DataStreamLifecycle.addEffectiveRetentionParams(params),
                     rolloverConfiguration,
-                    globalRetention
+                    dataGlobalRetention,
+                    failuresGlobalRetention
                 );
             }
             builder.endArray();
@@ -657,12 +683,13 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
             Response response = (Response) o;
             return dataStreams.equals(response.dataStreams)
                 && Objects.equals(rolloverConfiguration, response.rolloverConfiguration)
-                && Objects.equals(globalRetention, response.globalRetention);
+                && Objects.equals(dataGlobalRetention, response.dataGlobalRetention)
+                && Objects.equals(failuresGlobalRetention, response.failuresGlobalRetention);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(dataStreams, rolloverConfiguration, globalRetention);
+            return Objects.hash(dataStreams, rolloverConfiguration, dataGlobalRetention, failuresGlobalRetention);
         }
     }
 

+ 43 - 13
server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleAction.java

@@ -147,25 +147,39 @@ public class ExplainDataStreamLifecycleAction {
         @Nullable
         private final RolloverConfiguration rolloverConfiguration;
         @Nullable
-        private final DataStreamGlobalRetention globalRetention;
+        private final DataStreamGlobalRetention dataGlobalRetention;
+        @Nullable
+        private final DataStreamGlobalRetention failureGlobalRetention;
 
         public Response(
             List<ExplainIndexDataStreamLifecycle> indices,
             @Nullable RolloverConfiguration rolloverConfiguration,
-            @Nullable DataStreamGlobalRetention globalRetention
+            @Nullable DataStreamGlobalRetention dataGlobalRetention,
+            @Nullable DataStreamGlobalRetention failureGlobalRetention
         ) {
             this.indices = indices;
             this.rolloverConfiguration = rolloverConfiguration;
-            this.globalRetention = globalRetention;
+            this.dataGlobalRetention = dataGlobalRetention;
+            this.failureGlobalRetention = failureGlobalRetention;
         }
 
         public Response(StreamInput in) throws IOException {
             super(in);
             this.indices = in.readCollectionAsList(ExplainIndexDataStreamLifecycle::new);
             this.rolloverConfiguration = in.readOptionalWriteable(RolloverConfiguration::new);
-            this.globalRetention = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)
-                ? in.readOptionalWriteable(DataStreamGlobalRetention::read)
-                : null;
+            if (in.getTransportVersion().onOrAfter(TransportVersions.INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19)) {
+                var defaultRetention = in.readOptionalTimeValue();
+                var maxRetention = in.readOptionalTimeValue();
+                var defaultFailuresRetention = in.readOptionalTimeValue();
+                dataGlobalRetention = DataStreamGlobalRetention.create(defaultRetention, maxRetention);
+                failureGlobalRetention = DataStreamGlobalRetention.create(defaultFailuresRetention, maxRetention);
+            } else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
+                dataGlobalRetention = in.readOptionalWriteable(DataStreamGlobalRetention::read);
+                failureGlobalRetention = dataGlobalRetention;
+            } else {
+                dataGlobalRetention = null;
+                failureGlobalRetention = null;
+            }
         }
 
         public List<ExplainIndexDataStreamLifecycle> getIndices() {
@@ -176,16 +190,31 @@ public class ExplainDataStreamLifecycleAction {
             return rolloverConfiguration;
         }
 
-        public DataStreamGlobalRetention getGlobalRetention() {
-            return globalRetention;
+        public DataStreamGlobalRetention getDataGlobalRetention() {
+            return dataGlobalRetention;
+        }
+
+        public DataStreamGlobalRetention getFailuresGlobalRetention() {
+            return failureGlobalRetention;
+        }
+
+        private DataStreamGlobalRetention getGlobalRetentionForLifecycle(DataStreamLifecycle lifecycle) {
+            if (lifecycle == null) {
+                return null;
+            }
+            return lifecycle.targetsFailureStore() ? failureGlobalRetention : dataGlobalRetention;
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeCollection(indices);
             out.writeOptionalWriteable(rolloverConfiguration);
-            if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
-                out.writeOptionalWriteable(globalRetention);
+            if (out.getTransportVersion().onOrAfter(TransportVersions.INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19)) {
+                out.writeOptionalTimeValue(dataGlobalRetention == null ? null : dataGlobalRetention.defaultRetention());
+                out.writeOptionalTimeValue(dataGlobalRetention == null ? null : dataGlobalRetention.maxRetention());
+                out.writeOptionalTimeValue(failureGlobalRetention == null ? null : failureGlobalRetention.defaultRetention());
+            } else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
+                out.writeOptionalWriteable(getDataGlobalRetention());
             }
         }
 
@@ -200,12 +229,13 @@ public class ExplainDataStreamLifecycleAction {
             Response response = (Response) o;
             return Objects.equals(indices, response.indices)
                 && Objects.equals(rolloverConfiguration, response.rolloverConfiguration)
-                && Objects.equals(globalRetention, response.globalRetention);
+                && Objects.equals(dataGlobalRetention, response.dataGlobalRetention)
+                && Objects.equals(failureGlobalRetention, response.failureGlobalRetention);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(indices, rolloverConfiguration, globalRetention);
+            return Objects.hash(indices, rolloverConfiguration, dataGlobalRetention, failureGlobalRetention);
         }
 
         @Override
@@ -220,7 +250,7 @@ public class ExplainDataStreamLifecycleAction {
                     builder,
                     DataStreamLifecycle.addEffectiveRetentionParams(outerParams),
                     rolloverConfiguration,
-                    globalRetention
+                    getGlobalRetentionForLifecycle(explainIndexDataLifecycle.getLifecycle())
                 );
                 return builder;
             }), Iterators.single((builder, params) -> {

+ 11 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java

@@ -48,6 +48,17 @@ public record DataStreamGlobalRetention(@Nullable TimeValue defaultRetention, @N
         this.maxRetention = maxRetention;
     }
 
+    /**
+     * Helper method that creates a global retention object or returns null in case both retentions are null
+     */
+    @Nullable
+    public static DataStreamGlobalRetention create(@Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) {
+        if (defaultRetention == null && maxRetention == null) {
+            return null;
+        }
+        return new DataStreamGlobalRetention(defaultRetention, maxRetention);
+    }
+
     private boolean validateRetentionValue(@Nullable TimeValue retention) {
         return retention == null || retention.getMillis() >= MIN_RETENTION_VALUE.getMillis();
     }

+ 94 - 8
server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java

@@ -13,6 +13,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 
@@ -24,8 +25,10 @@ import java.util.Map;
  * This class holds the data stream global retention settings. It defines, validates and monitors the settings.
  * <p>
  * The global retention settings apply to non-system data streams that are managed by the data stream lifecycle. They consist of:
- * - The default retention which applies to data streams that do not have a retention defined.
- * - The max retention which applies to all data streams that do not have retention or their retention has exceeded this value.
+ * - The default retention which applies to the backing indices of data streams that do not have a retention defined.
+ * - The max retention which applies to backing and failure indices of data streams that do not have retention or their
+ * retention has exceeded this value.
+ * - The failures default retention which applied to the failure indices of data streams that do not have retention defined.
  */
 public class DataStreamGlobalRetentionSettings {
 
@@ -82,27 +85,66 @@ public class DataStreamGlobalRetentionSettings {
         Setting.Property.Dynamic
     );
 
+    static final TimeValue FAILURES_DEFAULT_RETENTION = TimeValue.timeValueDays(30);
+    public static final Setting<TimeValue> FAILURE_STORE_DEFAULT_RETENTION_SETTING = Setting.timeSetting(
+        "data_streams.lifecycle.retention.failures_default",
+        FAILURES_DEFAULT_RETENTION,
+        new Setting.Validator<>() {
+            @Override
+            public void validate(TimeValue value) {}
+
+            @Override
+            public void validate(final TimeValue settingValue, final Map<Setting<?>, Object> settings) {
+                TimeValue defaultRetention = getSettingValueOrNull(settingValue);
+                // Currently, we do not validate the default for the failure store against the max because
+                // we start with a default value that might conflict the max retention.
+                validateIsolatedRetentionValue(defaultRetention, FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey());
+            }
+        },
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+
     @Nullable
     private volatile TimeValue defaultRetention;
     @Nullable
     private volatile TimeValue maxRetention;
+    @Nullable
+    private volatile TimeValue failuresDefaultRetention;
+    /** We cache the global retention objects, volatile is sufficient we only "write" this values in the settings appliers which
+     * are executed by {@link org.elasticsearch.common.settings.AbstractScopedSettings#applySettings(Settings)} which is synchronised.
+     */
+    @Nullable
+    private volatile DataStreamGlobalRetention dataGlobalRetention;
+    @Nullable
+    private volatile DataStreamGlobalRetention failuresGlobalRetention;
 
     private DataStreamGlobalRetentionSettings() {
 
     }
 
+    /**
+     * @return the max retention that applies to all data stream data
+     */
     @Nullable
     public TimeValue getMaxRetention() {
         return maxRetention;
     }
 
+    /**
+     * @return the default retention that applies either to the data component
+     */
     @Nullable
     public TimeValue getDefaultRetention() {
         return defaultRetention;
     }
 
-    public boolean areDefined() {
-        return getDefaultRetention() != null || getMaxRetention() != null;
+    /**
+     * @return the default retention that applies either to the data or the failures component
+     */
+    @Nullable
+    public TimeValue getDefaultRetention(boolean failureStore) {
+        return failureStore ? failuresDefaultRetention : defaultRetention;
     }
 
     /**
@@ -113,17 +155,33 @@ public class DataStreamGlobalRetentionSettings {
         DataStreamGlobalRetentionSettings dataStreamGlobalRetentionSettings = new DataStreamGlobalRetentionSettings();
         clusterSettings.initializeAndWatch(DATA_STREAMS_DEFAULT_RETENTION_SETTING, dataStreamGlobalRetentionSettings::setDefaultRetention);
         clusterSettings.initializeAndWatch(DATA_STREAMS_MAX_RETENTION_SETTING, dataStreamGlobalRetentionSettings::setMaxRetention);
+        clusterSettings.initializeAndWatch(
+            FAILURE_STORE_DEFAULT_RETENTION_SETTING,
+            dataStreamGlobalRetentionSettings::setFailuresDefaultRetention
+        );
         return dataStreamGlobalRetentionSettings;
     }
 
     private void setMaxRetention(TimeValue maxRetention) {
         this.maxRetention = getSettingValueOrNull(maxRetention);
-        logger.info("Updated max factory retention to [{}]", this.maxRetention == null ? null : maxRetention.getStringRep());
+        this.dataGlobalRetention = createDataStreamGlobalRetention(false);
+        this.failuresGlobalRetention = createDataStreamGlobalRetention(true);
+        logger.info("Updated global max retention to [{}]", this.maxRetention == null ? null : maxRetention.getStringRep());
     }
 
     private void setDefaultRetention(TimeValue defaultRetention) {
         this.defaultRetention = getSettingValueOrNull(defaultRetention);
-        logger.info("Updated default factory retention to [{}]", this.defaultRetention == null ? null : defaultRetention.getStringRep());
+        this.dataGlobalRetention = createDataStreamGlobalRetention(false);
+        logger.info("Updated global default retention to [{}]", this.defaultRetention == null ? null : defaultRetention.getStringRep());
+    }
+
+    private void setFailuresDefaultRetention(TimeValue failuresDefaultRetention) {
+        this.failuresDefaultRetention = getSettingValueOrNull(failuresDefaultRetention);
+        this.failuresGlobalRetention = createDataStreamGlobalRetention(true);
+        logger.info(
+            "Updated failures default retention to [{}]",
+            this.failuresDefaultRetention == null ? null : failuresDefaultRetention.getStringRep()
+        );
     }
 
     private static void validateIsolatedRetentionValue(@Nullable TimeValue retention, String settingName) {
@@ -150,12 +208,36 @@ public class DataStreamGlobalRetentionSettings {
         }
     }
 
+    /**
+     * @return the global retention of backing indices
+     */
     @Nullable
     public DataStreamGlobalRetention get() {
-        if (areDefined() == false) {
+        return get(false);
+    }
+
+    /**
+     * Returns the global retention that applies to the data or failures of a data stream
+     * @param failureStore, true if we are retrieving the global retention that applies to failure store, false otherwise.
+     */
+    @Nullable
+    public DataStreamGlobalRetention get(boolean failureStore) {
+        return failureStore ? failuresGlobalRetention : dataGlobalRetention;
+    }
+
+    @Nullable
+    private DataStreamGlobalRetention createDataStreamGlobalRetention(boolean failureStore) {
+        if (areDefined(failureStore) == false) {
             return null;
         }
-        return new DataStreamGlobalRetention(getDefaultRetention(), getMaxRetention());
+        TimeValue defaultRetention = getDefaultRetention(failureStore);
+        TimeValue maxRetention = getMaxRetention();
+        // We ensure that we create valid DataStreamGlobalRetention where default is less or equal to max.
+        // If it's not we set it to null.
+        if (defaultRetention != null && maxRetention != null && defaultRetention.getMillis() > maxRetention.getMillis()) {
+            return new DataStreamGlobalRetention(null, getMaxRetention());
+        }
+        return new DataStreamGlobalRetention(defaultRetention, maxRetention);
     }
 
     /**
@@ -169,4 +251,8 @@ public class DataStreamGlobalRetentionSettings {
     private static TimeValue getSettingValueOrNull(TimeValue value) {
         return value == null || value.equals(TimeValue.MINUS_ONE) ? null : value;
     }
+
+    private boolean areDefined(boolean failureStore) {
+        return getDefaultRetention(failureStore) != null || getMaxRetention() != null;
+    }
 }

+ 6 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java

@@ -233,7 +233,10 @@ public class DataStreamLifecycle implements SimpleDiffable<DataStreamLifecycle>,
         }
         if (dataRetention() == null) {
             return globalRetention.defaultRetention() != null
-                ? Tuple.tuple(globalRetention.defaultRetention(), RetentionSource.DEFAULT_GLOBAL_RETENTION)
+                ? Tuple.tuple(
+                    globalRetention.defaultRetention(),
+                    targetsFailureStore() ? RetentionSource.DEFAULT_FAILURES_RETENTION : RetentionSource.DEFAULT_GLOBAL_RETENTION
+                )
                 : Tuple.tuple(globalRetention.maxRetention(), RetentionSource.MAX_GLOBAL_RETENTION);
         }
         if (globalRetention.maxRetention() != null && globalRetention.maxRetention().getMillis() < dataRetention().getMillis()) {
@@ -506,7 +509,8 @@ public class DataStreamLifecycle implements SimpleDiffable<DataStreamLifecycle>,
     public enum RetentionSource {
         DATA_STREAM_CONFIGURATION,
         DEFAULT_GLOBAL_RETENTION,
-        MAX_GLOBAL_RETENTION;
+        MAX_GLOBAL_RETENTION,
+        DEFAULT_FAILURES_RETENTION;
 
         public String displayName() {
             return this.toString().toLowerCase(Locale.ROOT);

+ 2 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

@@ -279,7 +279,7 @@ public class MetadataDataStreamsService {
         }
         if (lifecycle != null) {
             // We don't issue any warnings if all data streams are internal data streams
-            lifecycle.addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(), onlyInternalDataStreams);
+            lifecycle.addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(false), onlyInternalDataStreams);
         }
         return ClusterState.builder(currentState).metadata(builder.build()).build();
     }
@@ -305,7 +305,7 @@ public class MetadataDataStreamsService {
             // We don't issue any warnings if all data streams are internal data streams
             dataStreamOptions.failureStore()
                 .lifecycle()
-                .addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(), onlyInternalDataStreams);
+                .addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(true), onlyInternalDataStreams);
         }
         return ClusterState.builder(currentState).metadata(builder.build()).build();
     }

+ 27 - 6
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

@@ -341,9 +341,14 @@ public class MetadataIndexTemplateService {
                         tempStateWithComponentTemplateAdded.metadata(),
                         composableTemplateName,
                         composableTemplate,
-                        globalRetentionSettings.get()
+                        globalRetentionSettings.get(false)
+                    );
+                    validateDataStreamOptions(
+                        tempStateWithComponentTemplateAdded.metadata(),
+                        composableTemplateName,
+                        composableTemplate,
+                        globalRetentionSettings.get(true)
                     );
-                    validateDataStreamOptions(tempStateWithComponentTemplateAdded.metadata(), composableTemplateName, composableTemplate);
                     validateIndexTemplateV2(composableTemplateName, composableTemplate, tempStateWithComponentTemplateAdded);
                 } catch (Exception e) {
                     if (validationFailure == null) {
@@ -370,7 +375,7 @@ public class MetadataIndexTemplateService {
             finalComponentTemplate.template()
                 .lifecycle()
                 .toDataStreamLifecycle()
-                .addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(), false);
+                .addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(false), false);
         }
 
         logger.info("{} component template [{}]", existing == null ? "adding" : "updating", name);
@@ -725,8 +730,8 @@ public class MetadataIndexTemplateService {
 
         validate(name, templateToValidate);
         validateDataStreamsStillReferenced(currentState, name, templateToValidate);
-        validateLifecycle(currentState.metadata(), name, templateToValidate, globalRetentionSettings.get());
-        validateDataStreamOptions(currentState.metadata(), name, templateToValidate);
+        validateLifecycle(currentState.metadata(), name, templateToValidate, globalRetentionSettings.get(false));
+        validateDataStreamOptions(currentState.metadata(), name, templateToValidate, globalRetentionSettings.get(true));
 
         if (templateToValidate.isDeprecated() == false) {
             validateUseOfDeprecatedComponentTemplates(name, templateToValidate, currentState.metadata().componentTemplates());
@@ -821,7 +826,12 @@ public class MetadataIndexTemplateService {
     }
 
     // Visible for testing
-    static void validateDataStreamOptions(Metadata metadata, String indexTemplateName, ComposableIndexTemplate template) {
+    static void validateDataStreamOptions(
+        Metadata metadata,
+        String indexTemplateName,
+        ComposableIndexTemplate template,
+        DataStreamGlobalRetention globalRetention
+    ) {
         DataStreamOptions.Builder dataStreamOptionsBuilder = resolveDataStreamOptions(template, metadata.componentTemplates());
         if (dataStreamOptionsBuilder != null) {
             if (template.getDataStreamTemplate() == null) {
@@ -831,6 +841,17 @@ public class MetadataIndexTemplateService {
                         + "] specifies data stream options that can only be used in combination with a data stream"
                 );
             }
+            if (globalRetention != null) {
+                // We cannot know for sure if the template will apply to internal data streams, so we use a simpler heuristic:
+                // If all the index patterns start with a dot, we consider that all the connected data streams are internal.
+                boolean isInternalDataStream = template.indexPatterns().stream().allMatch(indexPattern -> indexPattern.charAt(0) == '.');
+                DataStreamOptions dataStreamOptions = dataStreamOptionsBuilder.build();
+                if (dataStreamOptions.failureStore() != null && dataStreamOptions.failureStore().lifecycle() != null) {
+                    dataStreamOptions.failureStore()
+                        .lifecycle()
+                        .addWarningHeaderIfDataRetentionNotEffective(globalRetention, isInternalDataStream);
+                }
+            }
         }
     }
 

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -619,6 +619,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
         TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
         DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
         DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
+        DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING,
         ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
         DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING,
         TransportGetAllocationStatsAction.CACHE_TTL_SETTING

+ 1 - 1
server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java

@@ -75,7 +75,7 @@ public class GetDataStreamActionTests extends ESTestCase {
             ToXContent.Params params = new ToXContent.MapParams(DataStreamLifecycle.INCLUDE_EFFECTIVE_RETENTION_PARAMS);
             RolloverConfiguration rolloverConfiguration = null;
             DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(globalDefaultRetention, globalMaxRetention);
-            dataStreamInfo.toXContent(builder, params, rolloverConfiguration, globalRetention);
+            dataStreamInfo.toXContent(builder, params, rolloverConfiguration, globalRetention, globalRetention);
             String serialized = Strings.toString(builder);
             return XContentHelper.convertToMap(XContentType.JSON.xContent(), serialized, randomBoolean());
         }

+ 21 - 9
server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleResponseTests.java

@@ -33,6 +33,7 @@ import org.junit.Before;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction.Response;
 import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
@@ -67,7 +68,7 @@ public class ExplainDataStreamLifecycleResponseTests extends AbstractWireSeriali
         ExplainIndexDataStreamLifecycle explainIndex = createRandomIndexDataStreamLifecycleExplanation(now, lifecycle);
         explainIndex.setNowSupplier(() -> now);
         {
-            Response response = new Response(List.of(explainIndex), null, null);
+            Response response = new Response(List.of(explainIndex), null, null, null);
 
             XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
             response.toXContentChunked(EMPTY_PARAMS).forEachRemaining(xcontent -> {
@@ -133,10 +134,16 @@ public class ExplainDataStreamLifecycleResponseTests extends AbstractWireSeriali
                     new MinPrimaryShardDocsCondition(4L)
                 )
             );
+            DataStreamGlobalRetention dataGlobalRetention = DataStreamTestHelper.randomGlobalRetention();
+            DataStreamGlobalRetention failuresGlobalRetention = new DataStreamGlobalRetention(
+                randomTimeValue(1, 30, TimeUnit.DAYS),
+                dataGlobalRetention == null ? null : dataGlobalRetention.maxRetention()
+            );
             Response response = new Response(
                 List.of(explainIndex),
                 new RolloverConfiguration(rolloverConditions),
-                DataStreamTestHelper.randomGlobalRetention()
+                dataGlobalRetention,
+                failuresGlobalRetention
             );
 
             XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
@@ -219,7 +226,7 @@ public class ExplainDataStreamLifecycleResponseTests extends AbstractWireSeriali
                     )
                     : null
             );
-            Response response = new Response(List.of(explainIndexWithNullGenerationDate), null, null);
+            Response response = new Response(List.of(explainIndexWithNullGenerationDate), null, null, null);
 
             XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
             response.toXContentChunked(EMPTY_PARAMS).forEachRemaining(xcontent -> {
@@ -249,6 +256,7 @@ public class ExplainDataStreamLifecycleResponseTests extends AbstractWireSeriali
                 createRandomIndexDataStreamLifecycleExplanation(now, lifecycle)
             ),
             null,
+            null,
             null
         );
 
@@ -297,6 +305,14 @@ public class ExplainDataStreamLifecycleResponseTests extends AbstractWireSeriali
     }
 
     private Response randomResponse() {
+        var dataGlobalRetention = DataStreamGlobalRetention.create(
+            randomBoolean() ? TimeValue.timeValueDays(randomIntBetween(1, 10)) : null,
+            randomBoolean() ? TimeValue.timeValueDays(randomIntBetween(10, 20)) : null
+        );
+        var failuresGlobalRetention = DataStreamGlobalRetention.create(
+            randomBoolean() ? TimeValue.timeValueDays(randomIntBetween(1, 10)) : null,
+            dataGlobalRetention == null ? null : dataGlobalRetention.maxRetention()
+        );
         return new Response(
             List.of(
                 createRandomIndexDataStreamLifecycleExplanation(
@@ -311,12 +327,8 @@ public class ExplainDataStreamLifecycleResponseTests extends AbstractWireSeriali
                     )
                 )
                 : null,
-            randomBoolean()
-                ? new DataStreamGlobalRetention(
-                    TimeValue.timeValueDays(randomIntBetween(1, 10)),
-                    TimeValue.timeValueDays(randomIntBetween(10, 20))
-                )
-                : null
+            dataGlobalRetention,
+            failuresGlobalRetention
         );
     }
 }

+ 78 - 4
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettingsTests.java

@@ -27,6 +27,8 @@ public class DataStreamGlobalRetentionSettingsTests extends ESTestCase {
 
         assertThat(globalRetentionSettings.getDefaultRetention(), nullValue());
         assertThat(globalRetentionSettings.getMaxRetention(), nullValue());
+        assertThat(globalRetentionSettings.get(false), nullValue());
+        assertThat(globalRetentionSettings.get(true), equalTo(DataStreamGlobalRetention.create(TimeValue.timeValueDays(30), null)));
     }
 
     public void testMonitorsDefaultRetention() {
@@ -43,7 +45,8 @@ public class DataStreamGlobalRetentionSettingsTests extends ESTestCase {
             .build();
         clusterSettings.applySettings(newSettings);
 
-        assertThat(newDefaultRetention, equalTo(globalRetentionSettings.getDefaultRetention()));
+        assertThat(globalRetentionSettings.getDefaultRetention(), equalTo(newDefaultRetention));
+        assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(newDefaultRetention, null)));
 
         // Test invalid update
         Settings newInvalidSettings = Settings.builder()
@@ -57,6 +60,7 @@ public class DataStreamGlobalRetentionSettingsTests extends ESTestCase {
             exception.getCause().getMessage(),
             containsString("Setting 'data_streams.lifecycle.retention.default' should be greater than")
         );
+        assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(newDefaultRetention, null)));
     }
 
     public void testMonitorsMaxRetention() {
@@ -64,13 +68,25 @@ public class DataStreamGlobalRetentionSettingsTests extends ESTestCase {
         DataStreamGlobalRetentionSettings globalRetentionSettings = DataStreamGlobalRetentionSettings.create(clusterSettings);
 
         // Test valid update
-        TimeValue newMaxRetention = TimeValue.timeValueDays(randomIntBetween(10, 30));
+        TimeValue newMaxRetention = TimeValue.timeValueDays(randomIntBetween(10, 29));
         Settings newSettings = Settings.builder()
             .put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), newMaxRetention.toHumanReadableString(0))
             .build();
         clusterSettings.applySettings(newSettings);
 
-        assertThat(newMaxRetention, equalTo(globalRetentionSettings.getMaxRetention()));
+        assertThat(globalRetentionSettings.getMaxRetention(), equalTo(newMaxRetention));
+        assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(null, newMaxRetention)));
+        assertThat(globalRetentionSettings.get(true), equalTo(DataStreamGlobalRetention.create(null, newMaxRetention)));
+
+        newMaxRetention = TimeValue.timeValueDays(100);
+        newSettings = Settings.builder()
+            .put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), newMaxRetention.toHumanReadableString(0))
+            .build();
+        clusterSettings.applySettings(newSettings);
+        assertThat(
+            globalRetentionSettings.get(true),
+            equalTo(DataStreamGlobalRetention.create(TimeValue.timeValueDays(30), newMaxRetention))
+        );
 
         // Test invalid update
         Settings newInvalidSettings = Settings.builder()
@@ -84,11 +100,57 @@ public class DataStreamGlobalRetentionSettingsTests extends ESTestCase {
             exception.getCause().getMessage(),
             containsString("Setting 'data_streams.lifecycle.retention.max' should be greater than")
         );
+        assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(null, newMaxRetention)));
+    }
+
+    public void testMonitorsDefaultFailuresRetention() {
+        ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings();
+        DataStreamGlobalRetentionSettings globalRetentionSettings = DataStreamGlobalRetentionSettings.create(clusterSettings);
+
+        // Test valid update
+        TimeValue newDefaultRetention = TimeValue.timeValueDays(randomIntBetween(1, 10));
+        Settings newSettings = Settings.builder()
+            .put(
+                DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(),
+                newDefaultRetention.toHumanReadableString(0)
+            )
+            .build();
+        clusterSettings.applySettings(newSettings);
+
+        assertThat(globalRetentionSettings.getDefaultRetention(true), equalTo(newDefaultRetention));
+        assertThat(globalRetentionSettings.get(true), equalTo(DataStreamGlobalRetention.create(newDefaultRetention, null)));
+
+        // Test update default failures retention to infinite retention
+        newDefaultRetention = TimeValue.MINUS_ONE;
+        newSettings = Settings.builder()
+            .put(
+                DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(),
+                newDefaultRetention.toHumanReadableString(0)
+            )
+            .build();
+        clusterSettings.applySettings(newSettings);
+
+        assertThat(globalRetentionSettings.getDefaultRetention(true), nullValue());
+        assertThat(globalRetentionSettings.get(true), nullValue());
+
+        // Test invalid update
+        Settings newInvalidSettings = Settings.builder()
+            .put(DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(), TimeValue.ZERO)
+            .build();
+        IllegalArgumentException exception = expectThrows(
+            IllegalArgumentException.class,
+            () -> clusterSettings.applySettings(newInvalidSettings)
+        );
+        assertThat(
+            exception.getCause().getMessage(),
+            containsString("Setting 'data_streams.lifecycle.retention.failures_default' should be greater than")
+        );
+        assertThat(globalRetentionSettings.get(true), nullValue());
     }
 
     public void testCombinationValidation() {
         ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings();
-        DataStreamGlobalRetentionSettings.create(clusterSettings);
+        DataStreamGlobalRetentionSettings dataStreamGlobalRetentionSettings = DataStreamGlobalRetentionSettings.create(clusterSettings);
 
         // Test invalid update
         Settings newInvalidSettings = Settings.builder()
@@ -105,5 +167,17 @@ public class DataStreamGlobalRetentionSettingsTests extends ESTestCase {
                 "Setting [data_streams.lifecycle.retention.default=90d] cannot be greater than [data_streams.lifecycle.retention.max=30d]"
             )
         );
+
+        // Test valid update even if the failures default is greater than max.
+        Settings newValidSettings = Settings.builder()
+            .put(DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(), TimeValue.timeValueDays(90))
+            .put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), TimeValue.timeValueDays(30))
+            .build();
+        clusterSettings.applySettings(newValidSettings);
+        assertThat(dataStreamGlobalRetentionSettings.getDefaultRetention(true), equalTo(TimeValue.timeValueDays(90)));
+        assertThat(
+            dataStreamGlobalRetentionSettings.get(true),
+            equalTo(DataStreamGlobalRetention.create(null, TimeValue.timeValueDays(30)))
+        );
     }
 }

+ 3 - 2
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java

@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DATA_STREAM_CONFIGURATION;
+import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DEFAULT_FAILURES_RETENTION;
 import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DEFAULT_GLOBAL_RETENTION;
 import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.MAX_GLOBAL_RETENTION;
 import static org.hamcrest.Matchers.containsString;
@@ -372,7 +373,7 @@ public class DataStreamLifecycleTests extends AbstractWireSerializingTestCase<Da
                 false
             );
             assertThat(effectiveFailuresRetentionWithSource.v1(), equalTo(defaultRetention));
-            assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION));
+            assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_FAILURES_RETENTION));
         }
 
         // With retention in the data stream lifecycle
@@ -477,7 +478,7 @@ public class DataStreamLifecycleTests extends AbstractWireSerializingTestCase<Da
                 assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(MAX_GLOBAL_RETENTION));
             } else {
                 assertThat(effectiveFailuresRetentionWithSource.v1(), equalTo(globalRetention.defaultRetention()));
-                assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION));
+                assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_FAILURES_RETENTION));
             }
 
             // Now verify that internal data streams do not use global retention

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamUsageTransportAction.java

@@ -107,7 +107,7 @@ public class DataStreamUsageTransportAction extends XPackUsageFeatureTransportAc
                     if (effectiveDataRetentionWithSource.v2().equals(DataStreamLifecycle.RetentionSource.MAX_GLOBAL_RETENTION)) {
                         affectedByMaxRetentionCounter++;
                     }
-                    if (effectiveDataRetentionWithSource.v2().equals(DataStreamLifecycle.RetentionSource.DEFAULT_GLOBAL_RETENTION)) {
+                    if (effectiveDataRetentionWithSource.v2().equals(DataStreamLifecycle.RetentionSource.DEFAULT_FAILURES_RETENTION)) {
                         affectedByDefaultRetentionCounter++;
                     }
                 }