Browse Source

[DSL] Introduce data stream global retention - Part 3 (#105682)

In this PR we introduce the API that will expose the global retention configuration and will allow users to take advantage of it.

These APIs are protected by the dedicated introduced privileges:

`manage_data_stream_global_retention` or higher, which allows all operations on the global retention configuration
`monitor_data_stream_retention` or higher, which allows the retrieval of the global retention configuration.

This PR is the final PR that makes the global retention available for our users.
Mary Gouseti 1 year ago
parent
commit
2122da31cd
49 changed files with 2531 additions and 87 deletions
  1. 20 0
      docs/changelog/105682.yaml
  2. 13 1
      docs/reference/data-streams/data-stream-apis.asciidoc
  3. 121 0
      docs/reference/data-streams/lifecycle/apis/delete-global-retention.asciidoc
  4. 90 0
      docs/reference/data-streams/lifecycle/apis/get-global-retention.asciidoc
  5. 131 0
      docs/reference/data-streams/lifecycle/apis/put-global-retention.asciidoc
  6. 8 4
      docs/reference/data-streams/lifecycle/index.asciidoc
  7. 183 0
      docs/reference/data-streams/lifecycle/tutorial-manage-data-stream-retention.asciidoc
  8. 2 0
      docs/reference/rest-api/security/get-builtin-privileges.asciidoc
  9. 6 0
      docs/reference/security/authorization/privileges.asciidoc
  10. 2 2
      modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java
  11. 147 0
      modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java
  12. 213 0
      modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionPermissionsRestIT.java
  13. 4 4
      modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecyclePermissionsRestIT.java
  14. 8 1
      modules/data-streams/src/javaRestTest/resources/roles.yml
  15. 4 2
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java
  16. 42 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java
  17. 175 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionService.java
  18. 153 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamGlobalRetentionAction.java
  19. 168 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamGlobalRetentionAction.java
  20. 202 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamGlobalRetentionAction.java
  21. 122 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/UpdateDataStreamGlobalRetentionResponse.java
  22. 49 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDeleteDataStreamGlobalRetentionAction.java
  23. 47 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestGetDataStreamGlobalRetentionAction.java
  24. 53 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestPutDataStreamGlobalRetentionAction.java
  25. 200 0
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionServiceTests.java
  26. 8 4
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml
  27. 4 2
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/190_create_data_stream_with_lifecycle.yml
  28. 18 7
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/20_basic.yml
  29. 19 6
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/30_not_found.yml
  30. 139 0
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/40_global_retention.yml
  31. 35 0
      rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.delete_global_retention.json
  32. 29 0
      rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.get_global_retention.json
  33. 39 0
      rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.put_global_retention.json
  34. 8 4
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml
  35. 10 20
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_index_template/10_basic.yml
  36. 4 2
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_index_template/10_basic.yml
  37. 4 2
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_template/10_basic.yml
  38. 2 4
      server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComponentTemplateAction.java
  39. 2 4
      server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateAction.java
  40. 1 1
      server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java
  41. 3 1
      server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java
  42. 11 0
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java
  43. 1 2
      server/src/main/java/org/elasticsearch/rest/action/cat/RestTemplatesAction.java
  44. 5 3
      server/src/test/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateResponseTests.java
  45. 2 0
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionTests.java
  46. 9 9
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java
  47. 11 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java
  48. 3 0
      x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java
  49. 1 1
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml

+ 20 - 0
docs/changelog/105682.yaml

@@ -0,0 +1,20 @@
+pr: 105682
+summary: Introduce global retention in data stream lifecycle.
+area: Data streams
+type: feature
+issues:
+  - 106169
+highlight:
+  title: Add global retention in data stream lifecycle
+  body: |-
+    Data stream lifecycle now supports configuring retention on a cluster level, namely global retention. Global retention 
+    allows us to configure two different retentions:
+    
+    - `default_retention` is applied to all data streams managed by the data stream lifecycle that do not have retention
+    defined on the data stream level.
+    - `max_retention` is applied to all data streams managed by the data stream lifecycle and it allows any data stream 
+    data to be deleted after the `max_retention` has passed.
+
+    Furthermore, we introduce the term `effective_retention` which is the retention applied at a certain moment to a data
+    stream considering all the available retention configurations.
+  notable: true

+ 13 - 1
docs/reference/data-streams/data-stream-apis.asciidoc

@@ -25,7 +25,13 @@ preview:[]
 preview:[]
 * <<data-streams-explain-lifecycle,Explain data stream lifecycle>>
 preview:[]
-* <<data-streams-get-lifecycle-stats, Get data stream lifecycle stats>>
+* <<data-streams-get-lifecycle-stats,Get data stream lifecycle stats>>
+preview:[]
+* <<data-streams-put-global-retention,Update the global retention for data stream lifecycle managed data streams>>
+preview:[]
+* <<data-streams-get-global-retention,Get the global retention for data stream lifecycle managed data streams>>
+preview:[]
+* <<data-streams-delete-global-retention,Delete the global retention for data stream lifecycle managed data streams>>
 preview:[]
 
 The following API is available for <<tsds,time series data streams>>:
@@ -59,4 +65,10 @@ include::{es-repo-dir}/data-streams/lifecycle/apis/explain-lifecycle.asciidoc[]
 
 include::{es-repo-dir}/data-streams/lifecycle/apis/get-lifecycle-stats.asciidoc[]
 
+include::{es-repo-dir}/data-streams/lifecycle/apis/put-global-retention.asciidoc[]
+
+include::{es-repo-dir}/data-streams/lifecycle/apis/get-global-retention.asciidoc[]
+
+include::{es-repo-dir}/data-streams/lifecycle/apis/delete-global-retention.asciidoc[]
+
 include::{es-repo-dir}/indices/downsample-data-stream.asciidoc[]

+ 121 - 0
docs/reference/data-streams/lifecycle/apis/delete-global-retention.asciidoc

@@ -0,0 +1,121 @@
+[[data-streams-delete-global-retention]]
+=== Delete the global retention of data streams
+++++
+<titleabbrev>Delete Data Stream Global Retention</titleabbrev>
+++++
+
+preview::[]
+
+Deletes the global retention configuration that applies on every data stream managed by <<data-stream-lifecycle,data stream lifecycle>>.
+
+[[delete-global-retention-api-prereqs]]
+==== {api-prereq-title}
+
+** If the {es} {security-features} are enabled, you must have the `manage_data_stream_global_retention` <<privileges-list-cluster,cluster privilege>> to use this API.
+
+[[data-streams-delete-global-retention-request]]
+==== {api-request-title}
+
+`DELETE _data_stream/_global_retention`
+
+[[data-streams-delete-global-retention-desc]]
+==== {api-description-title}
+
+Deletes the global retention configuration that is applied on data streams managed by data stream lifecycle.
+
+[role="child_attributes"]
+[[delete-global-retention-api-query-parms]]
+==== {api-query-parms-title}
+
+`dry_run`::
+(Boolean) Signals that the request should determine the effect of the removal of the existing without updating
+the global retention. The default value is `false`, which means the removal will happen.
+
+[[delete-global-retention-api-response-body]]
+==== {api-response-body-title}
+
+`acknowledged`::
+(boolean)
+True, if the global retention has been removed. False, if it fails or if it was a dry run.
+
+`dry_run`::
+(boolean)
+True, if this was a dry run, false otherwise.
+
+`affected_data_streams`::
+(array of objects)
+Contains information about the data streams affected by the change.
++
+.Properties of objects in `affected_data_streams`
+[%collapsible%open]
+====
+`name`::
+(string)
+Name of the data stream.
+`previous_effective_retention`::
+(string)
+The retention that was effective before the change of this request. `infinite` if there was no retention applicable.
+`new_effective_retention`::
+(string)
+The retention that is or would be effective after this request. `infinite` if there is no retention applicable.
+====
+
+[[data-streams-delete-global-retention-example]]
+==== {api-examples-title}
+
+////
+
+[source,console]
+--------------------------------------------------
+PUT _data_stream/_global_retention
+{
+  "default_retention": "7d",
+  "max_retention": "90d"
+}
+
+PUT /_index_template/template
+{
+  "index_patterns": ["my-data-stream*"],
+  "template": {
+    "lifecycle": {}
+  },
+  "data_stream": { }
+}
+
+PUT /_data_stream/my-data-stream
+----
+// TESTSETUP
+////
+
+////
+[source,console]
+----
+DELETE /_data_stream/my-data-stream*
+DELETE /_index_template/template
+DELETE /_data_stream/_global_retention
+----
+// TEARDOWN
+////
+
+Let's update the global retention:
+[source,console]
+--------------------------------------------------
+DELETE _data_stream/_global_retention
+--------------------------------------------------
+
+The response will look like the following:
+
+[source,console-result]
+--------------------------------------------------
+{
+  "acknowledged": true,
+  "dry_run": false,
+  "affected_data_streams": [
+    {
+      "name": "my-data-stream",
+      "previous_effective_retention": "7d",
+      "new_effective_retention": "infinite"
+    }
+  ]
+}
+--------------------------------------------------

+ 90 - 0
docs/reference/data-streams/lifecycle/apis/get-global-retention.asciidoc

@@ -0,0 +1,90 @@
+[[data-streams-get-global-retention]]
+=== Get the global retention of data streams
+++++
+<titleabbrev>Get Data Stream Global Retention</titleabbrev>
+++++
+
+preview::[]
+
+Gets the global retention that applies on every data stream managed by <<data-stream-lifecycle,data stream lifecycle>>.
+
+[[get-global-retention-api-prereqs]]
+==== {api-prereq-title}
+
+** If the {es} {security-features} are enabled, you must have the `monitor_data_stream_global_retention` or
+`manage_data_stream_global_retention` <<privileges-list-cluster,cluster privilege>> to use this API.
+
+[[data-streams-get-global-retention-request]]
+==== {api-request-title}
+
+`GET _data_stream/_global_retention`
+
+[[data-streams-get-global-retention-desc]]
+==== {api-description-title}
+
+Gets the global retention configuration that is applied on data streams managed by data stream lifecycle.
+
+[role="child_attributes"]
+[[get-global-retention-api-query-parms]]
+==== {api-query-parms-title}
+
+include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=local]
+
+include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
+
+[[get-global-retention-api-response-body]]
+==== {api-response-body-title}
+
+`default_retention`::
+(Optional, string)
+The default retention that will apply to any data stream managed by data stream lifecycle that does not have a retention
+defined on the data stream level.
+
+`max_retention`::
+(Optional, string)
+The max retention that will apply to all data streams managed by data stream lifecycle. The max retention will override the
+retention of a data stream whose retention exceeds the max retention.
+
+
+[[data-streams-get-global-retention-example]]
+==== {api-examples-title}
+
+////
+
+[source,console]
+--------------------------------------------------
+PUT _data_stream/_global_retention
+{
+  "default_retention": "7d",
+  "max_retention": "90d"
+}
+--------------------------------------------------
+// TESTSETUP
+
+[source,console]
+--------------------------------------------------
+DELETE _data_stream/_global_retention
+--------------------------------------------------
+// TEARDOWN
+
+////
+
+Let's retrieve the global retention:
+
+[source,console]
+--------------------------------------------------
+GET _data_stream/_global_retention
+--------------------------------------------------
+
+The response will look like the following:
+
+[source,console-result]
+--------------------------------------------------
+{
+  "default_retention": "7d",    <1>
+  "max_retention": "90d"        <2>
+}
+--------------------------------------------------
+<1> 7 days retention will be applied to any data stream that does not have retention set in its lifecycle.
+<2> 90 days retention will be applied to all data streams that have retention that exceeds the 90 days, this
+applies to data streams that have infinite retention too.

+ 131 - 0
docs/reference/data-streams/lifecycle/apis/put-global-retention.asciidoc

@@ -0,0 +1,131 @@
+[[data-streams-put-global-retention]]
+=== Update the global retention of data streams
+++++
+<titleabbrev>Update Data Stream Global Retention</titleabbrev>
+++++
+
+preview::[]
+
+Updates the global retention configuration that applies on every data stream managed by <<data-stream-lifecycle,data stream lifecycle>>.
+
+[[put-global-retention-api-prereqs]]
+==== {api-prereq-title}
+
+** If the {es} {security-features} are enabled, you must have the `manage_data_stream_global_retention` <<privileges-list-cluster,cluster privilege>> to use this API.
+
+[[data-streams-put-global-retention-request]]
+==== {api-request-title}
+
+`PUT _data_stream/_global_retention`
+
+[[data-streams-put-global-retention-desc]]
+==== {api-description-title}
+
+Updates the global retention configuration that is applied on data streams managed by data stream lifecycle.
+
+[role="child_attributes"]
+[[put-global-retention-api-query-parms]]
+==== {api-query-parms-title}
+
+`dry_run`::
+(Boolean) Signals that the request should determine the effect of the provided configuration without updating the
+global retention settings. The default value is `false`, which means the configuration provided will be applied.
+
+[[put-global-retention-api-request-body]]
+==== {api-request-body-title}
+
+`default_retention`::
+(Optional, string)
+The default retention that will apply to any data stream managed by data stream lifecycle that does not have a retention
+defined on the data stream level.
+
+`max_retention`::
+(Optional, string)
+The max retention that will apply to all data streams managed by data stream lifecycle. The max retention will override the
+retention of a data stream which retention exceeds the max retention.
+
+[[put-global-retention-api-response-body]]
+==== {api-response-body-title}
+
+`acknowledged`::
+(boolean)
+True, if the global retention has been updated to the provided values. False, if it fails or if it was a dry run.
+
+`dry_run`::
+(boolean)
+True, if this was a dry run, false otherwise.
+
+`affected_data_streams`::
+(array of objects)
+Contains information about the data streams affected by the change.
++
+.Properties of objects in `affected_data_streams`
+[%collapsible%open]
+====
+`name`::
+(string)
+Name of the data stream.
+`previous_effective_retention`::
+(string)
+The retention that was effective before the change of this request. `infinite` if there was no retention applicable.
+`new_effective_retention`::
+(string)
+The retention that is or would be effective after this request. `infinite` if there is no retention applicable.
+====
+
+[[data-streams-put-global-retention-example]]
+==== {api-examples-title}
+
+////
+[source,console]
+----
+PUT /_index_template/template
+{
+  "index_patterns": ["my-data-stream*"],
+  "template": {
+    "lifecycle": {}
+  },
+  "data_stream": { }
+}
+
+PUT /_data_stream/my-data-stream
+----
+// TESTSETUP
+////
+
+////
+[source,console]
+----
+DELETE /_data_stream/my-data-stream*
+DELETE /_index_template/template
+DELETE /_data_stream/_global_retention
+----
+// TEARDOWN
+////
+
+Let's update the global retention:
+[source,console]
+--------------------------------------------------
+PUT _data_stream/_global_retention
+{
+  "default_retention": "7d",
+  "max_retention": "90d"
+}
+--------------------------------------------------
+
+The response will look like the following:
+
+[source,console-result]
+--------------------------------------------------
+{
+  "acknowledged": true,
+  "dry_run": false,
+  "affected_data_streams": [
+    {
+      "name": "my-data-stream",
+      "previous_effective_retention": "infinite",
+      "new_effective_retention": "7d"
+    }
+  ]
+}
+--------------------------------------------------

+ 8 - 4
docs/reference/data-streams/lifecycle/index.asciidoc

@@ -16,7 +16,8 @@ To achieve that, it supports:
 * Automatic <<index-rollover,rollover>>, which chunks your incoming data in smaller pieces to facilitate better performance
 and backwards incompatible mapping changes.
 * Configurable retention, which allows you to configure the time period for which your data is guaranteed to be stored.
-{es} is allowed at a later time to delete data older than this time period.
+{es} is allowed at a later time to delete data older than this time period. Retention can be configured on the data stream level
+or on a global level. Read more about the different options in this <<tutorial-manage-data-stream-retention,tutorial>>.
 
 A data stream lifecycle also supports downsampling the data stream backing indices.
 See <<data-streams-put-lifecycle-downsampling-example, the downsampling example>> for 
@@ -42,9 +43,10 @@ data that is most likely to keep being queried.
 4. If <<data-streams-put-lifecycle-downsampling-example, downsampling>> is configured it will execute 
 all the configured downsampling rounds.
 5. Applies retention to the remaining backing indices. This means deleting the backing indices whose
-`generation_time` is longer than the configured retention period. The `generation_time` is only applicable to rolled over backing
-indices and it is either the time since the backing index got rolled over, or the time optionally configured in the
-<<index-data-stream-lifecycle-origination-date,`index.lifecycle.origination_date`>> setting.
+`generation_time` is longer than the effective retention period (read more about the
+<<effective-retention-calculation, effective retention calculation>>). The `generation_time` is only applicable to rolled
+over backing indices and it is either the time since the backing index got rolled over, or the time optionally configured
+in the <<index-data-stream-lifecycle-origination-date,`index.lifecycle.origination_date`>> setting.
 
 IMPORTANT: We use the `generation_time` instead of the creation time because this ensures that all data in the backing
 index have passed the retention period. As a result, the retention period is not the exact time data gets deleted, but
@@ -77,4 +79,6 @@ include::tutorial-manage-new-data-stream.asciidoc[]
 
 include::tutorial-manage-existing-data-stream.asciidoc[]
 
+include::tutorial-manage-data-stream-retention.asciidoc[]
+
 include::tutorial-migrate-data-stream-from-ilm-to-dsl.asciidoc[]

+ 183 - 0
docs/reference/data-streams/lifecycle/tutorial-manage-data-stream-retention.asciidoc

@@ -0,0 +1,183 @@
+[role="xpack"]
+[[tutorial-manage-data-stream-retention]]
+=== Tutorial: Data stream retention
+
+preview::[]
+
+In this tutorial, we are going to go over the data stream lifecycle retention, define it, go over how it can be configured and how
+it can be applied. Keep in mind, the following options apply only to data streams that are managed by the data stream lifecycle.
+
+. <<what-is-retention>>
+. <<retention-configuration>>
+. <<effective-retention-calculation>>
+. <<effective-retention-application>>
+
+You can verify if a data steam is managed by the data stream lifecycle via the <<data-streams-get-lifecycle,get data stream lifecycle API>>:
+
+////
+[source,console]
+----
+PUT /_index_template/template
+{
+  "index_patterns": ["my-data-stream*"],
+  "template": {
+    "lifecycle": {}
+  },
+  "data_stream": { }
+}
+
+PUT /_data_stream/my-data-stream
+----
+// TESTSETUP
+////
+
+////
+[source,console]
+----
+DELETE /_data_stream/my-data-stream*
+DELETE /_index_template/template
+DELETE /_data_stream/_global_retention
+----
+// TEARDOWN
+////
+
+[source,console]
+--------------------------------------------------
+GET _data_stream/my-data-stream/_lifecycle
+--------------------------------------------------
+
+The result should look like this:
+
+[source,console-result]
+--------------------------------------------------
+{
+  "data_streams": [
+    {
+      "name": "my-data-stream",                                   <1>
+      "lifecycle": {
+        "enabled": true                                           <2>
+      }
+    }
+  ]
+}
+--------------------------------------------------
+// TESTRESPONSE[skip:the result is for illustrating purposes only]
+<1> The name of your data stream.
+<2> Ensure that the lifecycle is enabled, meaning this should be `true`.
+
+[discrete]
+[[what-is-retention]]
+==== What is data stream retention?
+
+We define retention as the least amount of time the data of a data stream are going to be kept in {es}. After this time period
+has passed, {es} is allowed to remove these data to free up space and/or manage costs.
+
+NOTE: Retention does not define the period that the data will be removed, but the minimum time period they will be kept.
+
+We define 4 different types of retention:
+
+* The data stream retention, or `data_retention`, which is the retention configured on the data stream level. It can be
+set via an <<index-templates,index template>> for future data streams or via the <<data-streams-put-lifecycle, PUT data
+stream lifecycle API>> for an existing data stream. When the data stream retention is not set, it implies that the data
+need to be kept forever.
+* The global default retention, or `default_retention`, which is a retention configured on a cluster level and will be
+applied to all data streams managed by data stream lifecycle that do not have `data_retention` configured. Effectively,
+it ensures that there will be no data streams keeping their data forever. This can be set via the
+<<data-streams-put-global-retention, PUT global retention API>>.
+* The global max retention, or `max_retention`, which is a retention configured on a cluster level and will be applied to
+all data streams managed by data stream lifecycle. Effectively, it ensures that there will be no data streams whose retention
+will exceed this time period. This can be set via the <<data-streams-put-global-retention, PUT global retention API>>.
+* The effective retention, or `effective_retention`, which is the retention applied at a data stream on a given moment.
+Effective retention cannot be set, it is derived by taking into account all the configured retention listed above and is
+calculated as it is described <<effective-retention-calculation,here>>.
+
+[discrete]
+[[retention-configuration]]
+==== How to configure retention?
+
+- By setting the `data_retention` on the data stream level. This retention can be configured in two ways:
++
+-- For new data streams, it can be defined in the index template that would be applied during the data stream's creation.
+You can use the <<indices-put-template,create index template API>>, for example:
++
+[source,console]
+--------------------------------------------------
+PUT _index_template/template
+{
+  "index_patterns": ["my-data-stream*"],
+  "data_stream": { },
+  "priority": 500,
+  "template": {
+    "lifecycle": {
+      "data_retention": "7d"
+    }
+  },
+  "_meta": {
+    "description": "Template with data stream lifecycle"
+  }
+}
+--------------------------------------------------
+-- For an existing data stream, it can be set via the <<data-streams-put-lifecycle, PUT lifecycle API>>.
++
+[source,console]
+----
+PUT _data_stream/my-data-stream/_lifecycle
+{
+  "data_retention": "30d" <1>
+}
+----
+// TEST[continued]
+<1> The retention period of this data stream is set to 30 days.
+
+- By setting the global retention via the `default_retention` and `max_retention` that are set on a cluster level. You
+can set them via the <<data-streams-put-global-retention, PUT global retention API>>. For example:
++
+[source,console]
+--------------------------------------------------
+PUT _data_stream/_global_retention
+{
+  "default_retention": "7d",
+  "max_retention": "90d"
+}
+--------------------------------------------------
+// TEST[continued]
+
+[discrete]
+[[effective-retention-calculation]]
+==== How is the effective retention calculated?
+The effective is calculated in the following way:
+
+- The `effective_retention` is the `default_retention`, when `default_retention` is defined and the data stream does not
+have `data_retention`.
+- The `effective_retention` is the `data_retention`, when `data_retention` is defined and if `max_retention` is defined,
+it is less than the `max_retention`.
+- The `effective_retention` is the `max_retention`, when `max_retention` is defined, and the data stream has either no
+`data_retention` or its `data_retention` is greater than the `max_retention`.
+
+The above is demonstrated in the examples below:
+
+|===
+|`default_retention`    |`max_retention`    |`data_retention`   |`effective_retention`  |Retention determined by
+
+|Not set                |Not set            |Not set            |Infinite               |N/A
+|Not relevant           |12 months          |**30 days**        |30 days                |`data_retention`
+|Not relevant           |Not set            |**30 days**        |30 days                |`data_retention`
+|**30 days**            |12 months          |Not set            |30 days                |`default_retention`
+|**30 days**            |30 days            |Not set            |30 days                |`default_retention`
+|Not relevant           |**30 days**        |12 months          |30 days                |`max_retention`
+|Not set                |**30 days**        |Not set            |30 days                |`max_retention`
+|===
+
+[discrete]
+[[effective-retention-application]]
+==== How is the effective retention applied?
+
+Retention is applied to the remaining backing indices of a data stream as the last step of
+<<data-streams-lifecycle-how-it-works, a data stream lifecycle run>>. Data stream lifecycle will retrieve the backing indices
+whose `generation_time` is longer than the effective retention period and delete them. The `generation_time` is only
+applicable to rolled over backing indices and it is either the time since the backing index got rolled over, or the time
+optionally configured in the <<index-data-stream-lifecycle-origination-date,`index.lifecycle.origination_date`>> setting.
+
+IMPORTANT: We use the `generation_time` instead of the creation time because this ensures that all data in the backing
+index have passed the retention period. As a result, the retention period is not the exact time data get deleted, but
+the minimum time data will be stored.

+ 2 - 0
docs/reference/rest-api/security/get-builtin-privileges.asciidoc

@@ -75,6 +75,7 @@ A successful call returns an object with "cluster" and "index" fields.
     "manage_behavioral_analytics",
     "manage_ccr",
     "manage_data_frame_transforms",
+    "manage_data_stream_global_retention",
     "manage_enrich",
     "manage_ilm",
     "manage_index_templates",
@@ -99,6 +100,7 @@ A successful call returns an object with "cluster" and "index" fields.
     "manage_watcher",
     "monitor",
     "monitor_data_frame_transforms",
+    "monitor_data_stream_global_retention",
     "monitor_enrich",
     "monitor_inference",
     "monitor_ml",

+ 6 - 0
docs/reference/security/authorization/privileges.asciidoc

@@ -171,6 +171,9 @@ deprecated[7.5] Use `manage_transform` instead.
 `manage_enrich`::
 All operations related to managing and executing enrich policies.
 
+`manage_data_stream_global_retention`::
+All operations related to managing the data stream global retention settings.
+
 `manage_watcher`::
 All watcher operations, such as putting watches, executing, activate or acknowledging.
 +
@@ -206,6 +209,9 @@ All read-only operations related to the <<find-structure,find structure API>>.
 `monitor_transform`::
 All read-only operations related to {transforms}.
 
+`monitor_data_stream_global_retention`::
+Allows the retrieval of the data stream global retention settings.
+
 `monitor_watcher`::
 All read-only watcher operations, such as getting a watch and watcher stats.
 

+ 2 - 2
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java

@@ -47,7 +47,7 @@ public class LazyRolloverDataStreamIT extends ESRestTestCase {
         .setting("xpack.security.transport.ssl.enabled", "false")
         .setting("xpack.security.http.ssl.enabled", "false")
         .user("test_admin", PASSWORD, "superuser", false)
-        .user("test_simple_user", PASSWORD, "not_privileged", false)
+        .user("test_simple_user", PASSWORD, "under_privilged", false)
         .rolesFile(Resource.fromClasspath("roles.yml"))
         .build();
 
@@ -69,7 +69,7 @@ public class LazyRolloverDataStreamIT extends ESRestTestCase {
     }
 
     private Settings simpleUserRestClientSettings() {
-        // Note: This user is assigned the role "not_privileged". That role is defined in roles.yml.
+        // Note: This user is assigned the role "under_privilged". That role is defined in roles.yml.
         String token = basicAuthHeaderValue("test_simple_user", new SecureString(PASSWORD.toCharArray()));
         return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
     }

+ 147 - 0
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java

@@ -0,0 +1,147 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.datastreams.lifecycle;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.datastreams.DisabledSecurityDataStreamTestCase;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+public class DataStreamGlobalRetentionIT extends DisabledSecurityDataStreamTestCase {
+
+    @Before
+    public void setup() throws IOException {
+        updateClusterSettings(
+            Settings.builder()
+                .put("data_streams.lifecycle.poll_interval", "1s")
+                .put("cluster.lifecycle.default.rollover", "min_docs=1,max_docs=1")
+                .build()
+        );
+        // Create a template with the default lifecycle
+        Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
+        putComposableIndexTemplateRequest.setJsonEntity("""
+            {
+              "index_patterns": ["my-data-stream*"],
+              "data_stream": {},
+              "template": {
+                "lifecycle": {}
+              }
+            }
+            """);
+        assertOK(client().performRequest(putComposableIndexTemplateRequest));
+
+        // Create a data streams with one doc
+        Request createDocRequest = new Request("POST", "/my-data-stream/_doc?refresh=true");
+        createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-12-12\"}");
+        assertOK(client().performRequest(createDocRequest));
+    }
+
+    @After
+    public void cleanUp() throws IOException {
+        adminClient().performRequest(new Request("DELETE", "_data_stream/*"));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testDefaultRetention() throws Exception {
+        {
+            // Set global retention
+            Request request = new Request("PUT", "_data_stream/_global_retention");
+            request.setJsonEntity("""
+                {
+                  "default_retention": "10s"
+                }""");
+            assertAcknowledged(client().performRequest(request));
+        }
+
+        // Verify that the effective retention matches the default retention
+        {
+            Request request = new Request("GET", "/_data_stream/my-data-stream");
+            Response response = client().performRequest(request);
+            List<Object> dataStreams = (List<Object>) entityAsMap(response).get("data_streams");
+            assertThat(dataStreams.size(), is(1));
+            Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
+            assertThat(dataStream.get("name"), is("my-data-stream"));
+            Map<String, Object> lifecycle = (Map<String, Object>) dataStream.get("lifecycle");
+            assertThat(lifecycle.get("effective_retention"), is("10s"));
+            assertThat(lifecycle.get("retention_determined_by"), is("default_global_retention"));
+            assertThat(lifecycle.get("data_retention"), nullValue());
+        }
+
+        // Verify that the first generation index was removed
+        assertBusy(() -> {
+            Response response = client().performRequest(new Request("GET", "/_data_stream/my-data-stream"));
+            Map<String, Object> dataStream = ((List<Map<String, Object>>) entityAsMap(response).get("data_streams")).get(0);
+            assertThat(dataStream.get("name"), is("my-data-stream"));
+            List<Object> backingIndices = (List<Object>) dataStream.get("indices");
+            assertThat(backingIndices.size(), is(1));
+            // 2 backing indices created + 1 for the deleted index
+            assertThat(dataStream.get("generation"), is(3));
+        }, 20, TimeUnit.SECONDS);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testMaxRetention() throws Exception {
+        {
+            // Set global retention
+            Request request = new Request("PUT", "_data_stream/_global_retention");
+            request.setJsonEntity("""
+                {
+                  "max_retention": "10s"
+                }""");
+            assertAcknowledged(client().performRequest(request));
+        }
+        boolean withDataStreamLevelRetention = randomBoolean();
+        if (withDataStreamLevelRetention) {
+            Request request = new Request("PUT", "_data_stream/my-data-stream/_lifecycle");
+            request.setJsonEntity("""
+                {
+                  "data_retention": "30d"
+                }""");
+            assertAcknowledged(client().performRequest(request));
+        }
+
+        // Verify that the effective retention matches the max retention
+        {
+            Request request = new Request("GET", "/_data_stream/my-data-stream");
+            Response response = client().performRequest(request);
+            List<Object> dataStreams = (List<Object>) entityAsMap(response).get("data_streams");
+            assertThat(dataStreams.size(), is(1));
+            Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
+            assertThat(dataStream.get("name"), is("my-data-stream"));
+            Map<String, Object> lifecycle = (Map<String, Object>) dataStream.get("lifecycle");
+            assertThat(lifecycle.get("effective_retention"), is("10s"));
+            assertThat(lifecycle.get("retention_determined_by"), is("max_global_retention"));
+            if (withDataStreamLevelRetention) {
+                assertThat(lifecycle.get("data_retention"), is("30d"));
+            } else {
+                assertThat(lifecycle.get("data_retention"), nullValue());
+            }
+        }
+
+        // Verify that the first generation index was removed
+        assertBusy(() -> {
+            Response response = client().performRequest(new Request("GET", "/_data_stream/my-data-stream"));
+            Map<String, Object> dataStream = ((List<Map<String, Object>>) entityAsMap(response).get("data_streams")).get(0);
+            assertThat(dataStream.get("name"), is("my-data-stream"));
+            List<Object> backingIndices = (List<Object>) dataStream.get("indices");
+            assertThat(backingIndices.size(), is(1));
+            // 2 backing indices created + 1 for the deleted index
+            assertThat(dataStream.get("generation"), is(3));
+        }, 20, TimeUnit.SECONDS);
+    }
+}

+ 213 - 0
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionPermissionsRestIT.java

@@ -0,0 +1,213 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams.lifecycle;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.common.settings.SecureString;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.FeatureFlag;
+import org.elasticsearch.test.cluster.local.distribution.DistributionType;
+import org.elasticsearch.test.cluster.util.resource.Resource;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.junit.ClassRule;
+
+import java.util.Map;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class DataStreamGlobalRetentionPermissionsRestIT extends ESRestTestCase {
+
+    private static final String PASSWORD = "secret-test-password";
+
+    @ClassRule
+    public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
+        .distribution(DistributionType.DEFAULT)
+        .feature(FeatureFlag.FAILURE_STORE_ENABLED)
+        .setting("xpack.watcher.enabled", "false")
+        .setting("xpack.ml.enabled", "false")
+        .setting("xpack.security.enabled", "true")
+        .setting("xpack.security.transport.ssl.enabled", "false")
+        .setting("xpack.security.http.ssl.enabled", "false")
+        .user("test_admin", PASSWORD, "superuser", false)
+        .user("test_manage_global_retention", PASSWORD, "manage_data_stream_global_retention", false)
+        .user("test_monitor_global_retention", PASSWORD, "monitor_data_stream_global_retention", false)
+        .user("test_monitor", PASSWORD, "manage_data_stream_lifecycle", false)
+        .user("test_no_privilege", PASSWORD, "no_privilege", false)
+        .rolesFile(Resource.fromClasspath("roles.yml"))
+        .build();
+
+    @Override
+    protected String getTestRestCluster() {
+        return cluster.getHttpAddresses();
+    }
+
+    @Override
+    protected Settings restClientSettings() {
+        // If this test is running in a test framework that handles its own authorization, we don't want to overwrite it.
+        if (super.restClientSettings().keySet().contains(ThreadContext.PREFIX + ".Authorization")) {
+            return super.restClientSettings();
+        } else {
+            // Note: This user is assigned the role "manage_data_stream_lifecycle". That role is defined in roles.yml.
+            String token = basicAuthHeaderValue("test_data_stream_lifecycle", new SecureString(PASSWORD.toCharArray()));
+            return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
+        }
+    }
+
+    @Override
+    protected Settings restAdminSettings() {
+        // If this test is running in a test framework that handles its own authorization, we don't want to overwrite it.
+        if (super.restClientSettings().keySet().contains(ThreadContext.PREFIX + ".Authorization")) {
+            return super.restClientSettings();
+        } else {
+            // Note: We use the admin user because the other one is too unprivileged, so it breaks the initialization of the test
+            String token = basicAuthHeaderValue("test_admin", new SecureString(PASSWORD.toCharArray()));
+            return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
+        }
+    }
+
+    private Settings restManageGlobalRetentionClientSettings() {
+        String token = basicAuthHeaderValue("test_manage_global_retention", new SecureString(PASSWORD.toCharArray()));
+        return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
+    }
+
+    private Settings restMonitorGlobalRetentionClientSettings() {
+        String token = basicAuthHeaderValue("test_monitor_global_retention", new SecureString(PASSWORD.toCharArray()));
+        return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
+    }
+
+    private Settings restOnlyManageLifecycleClientSettings() {
+        String token = basicAuthHeaderValue("test_monitor", new SecureString(PASSWORD.toCharArray()));
+        return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
+    }
+
+    private Settings restNoPrivilegeClientSettings() {
+        String token = basicAuthHeaderValue("test_no_privilege", new SecureString(PASSWORD.toCharArray()));
+        return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
+    }
+
+    public void testManageGlobalRetentionPrivileges() throws Exception {
+        try (var client = buildClient(restManageGlobalRetentionClientSettings(), getClusterHosts().toArray(new HttpHost[0]))) {
+            Request request = new Request("PUT", "_data_stream/_global_retention");
+            request.setJsonEntity("""
+                {
+                  "default_retention": "1d",
+                  "max_retention": "7d"
+                }""");
+            assertAcknowledged(client.performRequest(request));
+            Map<String, Object> response = entityAsMap(client.performRequest(new Request("GET", "/_data_stream/_global_retention")));
+            assertThat(response.get("default_retention"), equalTo("1d"));
+            assertThat(response.get("max_retention"), equalTo("7d"));
+            assertAcknowledged(client.performRequest(new Request("DELETE", "/_data_stream/_global_retention")));
+        }
+    }
+
+    public void testMonitorGlobalRetentionPrivileges() throws Exception {
+        {
+            Request request = new Request("PUT", "_data_stream/_global_retention");
+            request.setJsonEntity("""
+                {
+                  "default_retention": "1d",
+                  "max_retention": "7d"
+                }""");
+            assertAcknowledged(adminClient().performRequest(request));
+        }
+        try (var client = buildClient(restMonitorGlobalRetentionClientSettings(), getClusterHosts().toArray(new HttpHost[0]))) {
+            Request request = new Request("PUT", "_data_stream/_global_retention");
+            request.setJsonEntity("""
+                {
+                  "default_retention": "1d",
+                  "max_retention": "7d"
+                }""");
+            ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request));
+            assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403));
+            assertThat(
+                responseException.getMessage(),
+                containsString(
+                    "action [cluster:admin/data_stream/global_retention/put] is unauthorized for user [test_monitor_global_retention]"
+                )
+            );
+            responseException = expectThrows(
+                ResponseException.class,
+                () -> client.performRequest(new Request("DELETE", "/_data_stream/_global_retention"))
+            );
+            assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403));
+            assertThat(
+                responseException.getMessage(),
+                containsString(
+                    "action [cluster:admin/data_stream/global_retention/delete] is unauthorized for user [test_monitor_global_retention]"
+                )
+            );
+            Map<String, Object> response = entityAsMap(client.performRequest(new Request("GET", "/_data_stream/_global_retention")));
+            assertThat(response.get("default_retention"), equalTo("1d"));
+            assertThat(response.get("max_retention"), equalTo("7d"));
+        }
+    }
+
+    public void testManageLifecyclePrivileges() throws Exception {
+        try (var client = buildClient(restOnlyManageLifecycleClientSettings(), getClusterHosts().toArray(new HttpHost[0]))) {
+            Request request = new Request("PUT", "_data_stream/_global_retention");
+            request.setJsonEntity("""
+                {
+                  "default_retention": "1d",
+                  "max_retention": "7d"
+                }""");
+            ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request));
+            assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403));
+            assertThat(
+                responseException.getMessage(),
+                containsString("action [cluster:admin/data_stream/global_retention/put] is unauthorized for user [test_monitor]")
+            );
+            // This use has the monitor privilege which includes the monitor_data_stream_global_retention
+            Response response = client.performRequest(new Request("GET", "/_data_stream/_global_retention"));
+            assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+        }
+    }
+
+    public void testNoPrivileges() throws Exception {
+        try (var client = buildClient(restNoPrivilegeClientSettings(), getClusterHosts().toArray(new HttpHost[0]))) {
+            Request request = new Request("PUT", "_data_stream/_global_retention");
+            request.setJsonEntity("""
+                {
+                  "default_retention": "1d",
+                  "max_retention": "7d"
+                }""");
+            ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request));
+            assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403));
+            assertThat(
+                responseException.getMessage(),
+                containsString("action [cluster:admin/data_stream/global_retention/put] is unauthorized for user [test_no_privilege]")
+            );
+            responseException = expectThrows(
+                ResponseException.class,
+                () -> client.performRequest(new Request("DELETE", "/_data_stream/_global_retention"))
+            );
+            assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403));
+            assertThat(
+                responseException.getMessage(),
+                containsString("action [cluster:admin/data_stream/global_retention/delete] is unauthorized for user [test_no_privilege]")
+            );
+            responseException = expectThrows(
+                ResponseException.class,
+                () -> client.performRequest(new Request("GET", "/_data_stream/_global_retention"))
+            );
+            assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(403));
+            assertThat(
+                responseException.getMessage(),
+                containsString("action [cluster:monitor/data_stream/global_retention/get] is unauthorized for user [test_no_privilege]")
+            );
+        }
+    }
+}

+ 4 - 4
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecyclePermissionsRestIT.java

@@ -47,7 +47,7 @@ public class DataStreamLifecyclePermissionsRestIT extends ESRestTestCase {
         .setting("xpack.security.http.ssl.enabled", "false")
         .user("test_admin", PASSWORD, "superuser", false)
         .user("test_data_stream_lifecycle", PASSWORD, "manage_data_stream_lifecycle", false)
-        .user("test_non_privileged", PASSWORD, "not_privileged", false)
+        .user("test_non_privileged", PASSWORD, "under_privilged", false)
         .rolesFile(Resource.fromClasspath("roles.yml"))
         .build();
 
@@ -88,13 +88,13 @@ public class DataStreamLifecyclePermissionsRestIT extends ESRestTestCase {
     }
 
     private Settings restPrivilegedClientSettings() {
-        // Note: This user is assigned the role "not_privileged". That role is defined in roles.yml.
+        // Note: This user is assigned the role "under_privilged". That role is defined in roles.yml.
         String token = basicAuthHeaderValue("test_data_stream_lifecycle", new SecureString(PASSWORD.toCharArray()));
         return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
     }
 
     private Settings restUnprivilegedClientSettings() {
-        // Note: This user is assigned the role "not_privileged". That role is defined in roles.yml.
+        // Note: This user is assigned the role "under_privilged". That role is defined in roles.yml.
         String token = basicAuthHeaderValue("test_non_privileged", new SecureString(PASSWORD.toCharArray()));
         return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
     }
@@ -106,7 +106,7 @@ public class DataStreamLifecyclePermissionsRestIT extends ESRestTestCase {
              * This test checks that a user with the "manage_data_stream_lifecycle" index privilege on "data-stream-lifecycle-*" data
              * streams can delete and put a lifecycle on the "data-stream-lifecycle-test" data stream, while a user with who does not have
              * that privilege (but does have all the other same "data-stream-lifecycle-*" privileges) cannot delete or put a lifecycle on
-             * that datastream.
+             * that data stream.
              */
             String dataStreamName = "data-stream-lifecycle-test"; // Needs to match the pattern of the names in roles.yml
             createDataStreamAsAdmin(dataStreamName);

+ 8 - 1
modules/data-streams/src/javaRestTest/resources/roles.yml

@@ -7,7 +7,7 @@ manage_data_stream_lifecycle:
         - read
         - write
         - manage_data_stream_lifecycle
-not_privileged:
+under_privilged:
   cluster:
     - monitor
   indices:
@@ -16,3 +16,10 @@ not_privileged:
         - read
         - write
         - view_index_metadata
+manage_data_stream_global_retention:
+  cluster:
+    - manage_data_stream_global_retention
+monitor_data_stream_global_retention:
+  cluster:
+    - monitor_data_stream_global_retention
+no_privilege:

+ 4 - 2
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java

@@ -11,6 +11,7 @@ package org.elasticsearch.datastreams;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
 import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
+import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
 import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher;
 import org.elasticsearch.features.FeatureSpecification;
 import org.elasticsearch.features.NodeFeature;
@@ -33,9 +34,10 @@ public class DataStreamFeatures implements FeatureSpecification {
     @Override
     public Set<NodeFeature> getFeatures() {
         return Set.of(
-            DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
+            DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE,  // Added in 8.12
             LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER,                    // Added in 8.13
-            DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE
+            DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
+            DataStreamGlobalRetention.GLOBAL_RETENTION                      // Added in 8.14
         );
     }
 }

+ 42 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

@@ -19,6 +19,9 @@ import org.elasticsearch.action.datastreams.MigrateToDataStreamAction;
 import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
 import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
 import org.elasticsearch.client.internal.OriginSettingClient;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.NamedDiff;
+import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -38,10 +41,14 @@ import org.elasticsearch.datastreams.action.ModifyDataStreamsTransportAction;
 import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction;
 import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
 import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
+import org.elasticsearch.datastreams.lifecycle.UpdateDataStreamGlobalRetentionService;
+import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamGlobalRetentionAction;
 import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction;
 import org.elasticsearch.datastreams.lifecycle.action.ExplainDataStreamLifecycleAction;
+import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamGlobalRetentionAction;
 import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamLifecycleAction;
 import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamLifecycleStatsAction;
+import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamGlobalRetentionAction;
 import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamLifecycleAction;
 import org.elasticsearch.datastreams.lifecycle.action.TransportDeleteDataStreamLifecycleAction;
 import org.elasticsearch.datastreams.lifecycle.action.TransportExplainDataStreamLifecycleAction;
@@ -51,9 +58,12 @@ import org.elasticsearch.datastreams.lifecycle.action.TransportPutDataStreamLife
 import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService;
 import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher;
 import org.elasticsearch.datastreams.lifecycle.rest.RestDataStreamLifecycleStatsAction;
+import org.elasticsearch.datastreams.lifecycle.rest.RestDeleteDataStreamGlobalRetentionAction;
 import org.elasticsearch.datastreams.lifecycle.rest.RestDeleteDataStreamLifecycleAction;
 import org.elasticsearch.datastreams.lifecycle.rest.RestExplainDataStreamLifecycleAction;
+import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamGlobalRetentionAction;
 import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamLifecycleAction;
+import org.elasticsearch.datastreams.lifecycle.rest.RestPutDataStreamGlobalRetentionAction;
 import org.elasticsearch.datastreams.lifecycle.rest.RestPutDataStreamLifecycleAction;
 import org.elasticsearch.datastreams.rest.RestCreateDataStreamAction;
 import org.elasticsearch.datastreams.rest.RestDataStreamsStatsAction;
@@ -132,6 +142,7 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlu
     private final SetOnce<DataStreamLifecycleService> dataLifecycleInitialisationService = new SetOnce<>();
     private final SetOnce<DataStreamLifecycleHealthInfoPublisher> dataStreamLifecycleErrorsPublisher = new SetOnce<>();
     private final SetOnce<DataStreamLifecycleHealthIndicatorService> dataStreamLifecycleHealthIndicatorService = new SetOnce<>();
+    private final SetOnce<UpdateDataStreamGlobalRetentionService> dataStreamGlobalRetentionService = new SetOnce<>();
     private final Settings settings;
 
     public DataStreamsPlugin(Settings settings) {
@@ -205,10 +216,12 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlu
         );
         dataLifecycleInitialisationService.get().init();
         dataStreamLifecycleHealthIndicatorService.set(new DataStreamLifecycleHealthIndicatorService());
+        dataStreamGlobalRetentionService.set(new UpdateDataStreamGlobalRetentionService(services.clusterService()));
 
         components.add(errorStoreInitialisationService.get());
         components.add(dataLifecycleInitialisationService.get());
         components.add(dataStreamLifecycleErrorsPublisher.get());
+        components.add(dataStreamGlobalRetentionService.get());
         return components;
     }
 
@@ -227,6 +240,24 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlu
         actions.add(new ActionHandler<>(DeleteDataStreamLifecycleAction.INSTANCE, TransportDeleteDataStreamLifecycleAction.class));
         actions.add(new ActionHandler<>(ExplainDataStreamLifecycleAction.INSTANCE, TransportExplainDataStreamLifecycleAction.class));
         actions.add(new ActionHandler<>(GetDataStreamLifecycleStatsAction.INSTANCE, TransportGetDataStreamLifecycleStatsAction.class));
+        actions.add(
+            new ActionHandler<>(
+                PutDataStreamGlobalRetentionAction.INSTANCE,
+                PutDataStreamGlobalRetentionAction.TransportPutDataStreamGlobalRetentionAction.class
+            )
+        );
+        actions.add(
+            new ActionHandler<>(
+                GetDataStreamGlobalRetentionAction.INSTANCE,
+                GetDataStreamGlobalRetentionAction.TransportGetDataStreamGlobalSettingsAction.class
+            )
+        );
+        actions.add(
+            new ActionHandler<>(
+                DeleteDataStreamGlobalRetentionAction.INSTANCE,
+                DeleteDataStreamGlobalRetentionAction.TransportDeleteDataStreamGlobalRetentionAction.class
+            )
+        );
         return actions;
     }
 
@@ -259,9 +290,20 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlu
         handlers.add(new RestDeleteDataStreamLifecycleAction());
         handlers.add(new RestExplainDataStreamLifecycleAction());
         handlers.add(new RestDataStreamLifecycleStatsAction());
+        handlers.add(new RestPutDataStreamGlobalRetentionAction());
+        handlers.add(new RestGetDataStreamGlobalRetentionAction());
+        handlers.add(new RestDeleteDataStreamGlobalRetentionAction());
         return handlers;
     }
 
+    @Override
+    public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
+        return List.of(
+            new NamedWriteableRegistry.Entry(ClusterState.Custom.class, DataStreamGlobalRetention.TYPE, DataStreamGlobalRetention::read),
+            new NamedWriteableRegistry.Entry(NamedDiff.class, DataStreamGlobalRetention.TYPE, DataStreamGlobalRetention::readDiffFrom)
+        );
+    }
+
     @Override
     public Collection<IndexSettingProvider> getAdditionalIndexSettingProviders(IndexSettingProvider.Parameters parameters) {
         return List.of(new DataStreamIndexSettingsProvider(parameters.mapperServiceFactory()));

+ 175 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionService.java

@@ -0,0 +1,175 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams.lifecycle;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateAckListener;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
+import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamGlobalRetentionAction;
+import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamGlobalRetentionAction;
+import org.elasticsearch.datastreams.lifecycle.action.UpdateDataStreamGlobalRetentionResponse;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This service manages the global retention configuration, it provides an API to set or remove global retention
+ * from the cluster state.
+ */
+public class UpdateDataStreamGlobalRetentionService {
+
+    private static final Logger logger = LogManager.getLogger(UpdateDataStreamGlobalRetentionService.class);
+
+    private final MasterServiceTaskQueue<UpsertGlobalDataStreamMetadataTask> taskQueue;
+
+    public UpdateDataStreamGlobalRetentionService(ClusterService clusterService) {
+        ClusterStateTaskExecutor<UpsertGlobalDataStreamMetadataTask> executor = new SimpleBatchedAckListenerTaskExecutor<>() {
+
+            @Override
+            public Tuple<ClusterState, ClusterStateAckListener> executeTask(
+                UpsertGlobalDataStreamMetadataTask task,
+                ClusterState clusterState
+            ) {
+                return new Tuple<>(updateGlobalRetention(clusterState, task.globalRetention()), task);
+            }
+        };
+        this.taskQueue = clusterService.createTaskQueue("data-stream-global-retention", Priority.HIGH, executor);
+
+    }
+
+    public void updateGlobalRetention(
+        PutDataStreamGlobalRetentionAction.Request request,
+        List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams,
+        final ActionListener<UpdateDataStreamGlobalRetentionResponse> listener
+    ) {
+        taskQueue.submitTask(
+            "update-data-stream-global-retention",
+            new UpsertGlobalDataStreamMetadataTask(
+                request.getGlobalRetention(),
+                affectedDataStreams,
+                listener,
+                request.masterNodeTimeout()
+            ),
+            request.masterNodeTimeout()
+        );
+    }
+
+    public void removeGlobalRetention(
+        DeleteDataStreamGlobalRetentionAction.Request request,
+        List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams,
+        final ActionListener<UpdateDataStreamGlobalRetentionResponse> listener
+    ) {
+        taskQueue.submitTask(
+            "remove-data-stream-global-retention",
+            new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, request.masterNodeTimeout()),
+            request.masterNodeTimeout()
+        );
+    }
+
+    public List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> determineAffectedDataStreams(
+        @Nullable DataStreamGlobalRetention newGlobalRetention,
+        ClusterState clusterState
+    ) {
+        var previousGlobalRetention = DataStreamGlobalRetention.getFromClusterState(clusterState);
+        if (Objects.equals(newGlobalRetention, previousGlobalRetention)) {
+            return List.of();
+        }
+        List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams = new ArrayList<>();
+        for (DataStream dataStream : clusterState.metadata().dataStreams().values()) {
+            if (dataStream.getLifecycle() != null) {
+                TimeValue previousEffectiveRetention = dataStream.getLifecycle().getEffectiveDataRetention(previousGlobalRetention);
+                TimeValue newEffectiveRetention = dataStream.getLifecycle().getEffectiveDataRetention(newGlobalRetention);
+                if (Objects.equals(previousEffectiveRetention, newEffectiveRetention) == false) {
+                    affectedDataStreams.add(
+                        new UpdateDataStreamGlobalRetentionResponse.AffectedDataStream(
+                            dataStream.getName(),
+                            newEffectiveRetention,
+                            previousEffectiveRetention
+                        )
+                    );
+                }
+            }
+        }
+        affectedDataStreams.sort(Comparator.comparing(UpdateDataStreamGlobalRetentionResponse.AffectedDataStream::dataStreamName));
+        return affectedDataStreams;
+    }
+
+    // Visible for testing
+    ClusterState updateGlobalRetention(ClusterState clusterState, @Nullable DataStreamGlobalRetention retentionFromRequest) {
+        final var initialRetention = DataStreamGlobalRetention.getFromClusterState(clusterState);
+        // Avoid storing empty retention in the cluster state
+        final var newRetention = DataStreamGlobalRetention.EMPTY.equals(retentionFromRequest) ? null : retentionFromRequest;
+        if (Objects.equals(newRetention, initialRetention)) {
+            return clusterState;
+        }
+        if (newRetention == null) {
+            return clusterState.copyAndUpdate(b -> b.removeCustom(DataStreamGlobalRetention.TYPE));
+        }
+        return clusterState.copyAndUpdate(b -> b.putCustom(DataStreamGlobalRetention.TYPE, newRetention));
+    }
+
+    /**
+     * A base class for the task updating the global retention in the cluster state.
+     */
+    record UpsertGlobalDataStreamMetadataTask(
+        @Nullable DataStreamGlobalRetention globalRetention,
+        List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams,
+        ActionListener<UpdateDataStreamGlobalRetentionResponse> listener,
+        TimeValue masterTimeout
+    ) implements ClusterStateTaskListener, ClusterStateAckListener {
+
+        @Override
+        public void onFailure(Exception e) {
+            listener.onFailure(e);
+        }
+
+        @Override
+        public boolean mustAck(DiscoveryNode discoveryNode) {
+            return true;
+        }
+
+        @Override
+        public void onAllNodesAcked() {
+            listener.onResponse(new UpdateDataStreamGlobalRetentionResponse(true, affectedDataStreams));
+        }
+
+        @Override
+        public void onAckFailure(Exception e) {
+            logger.debug("Failed to update global retention [{}] with error [{}]", globalRetention, e.getMessage());
+            listener.onResponse(UpdateDataStreamGlobalRetentionResponse.FAILED);
+        }
+
+        @Override
+        public void onAckTimeout() {
+            logger.debug("Failed to update global retention [{}] because timeout was reached", globalRetention);
+            listener.onResponse(UpdateDataStreamGlobalRetentionResponse.FAILED);
+        }
+
+        @Override
+        public TimeValue ackTimeout() {
+            return masterTimeout;
+        }
+    }
+}

+ 153 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/DeleteDataStreamGlobalRetentionAction.java

@@ -0,0 +1,153 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams.lifecycle.action;
+
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.MasterNodeRequest;
+import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.datastreams.lifecycle.UpdateDataStreamGlobalRetentionService;
+import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Deletes the global retention for data streams (if it's not a dry run) and it returns the affected data streams.
+ */
+public class DeleteDataStreamGlobalRetentionAction {
+
+    public static final ActionType<UpdateDataStreamGlobalRetentionResponse> INSTANCE = new ActionType<>(
+        "cluster:admin/data_stream/global_retention/delete"
+    );
+
+    private DeleteDataStreamGlobalRetentionAction() {/* no instances */}
+
+    public static final class Request extends MasterNodeRequest<Request> {
+        private boolean dryRun = false;
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            dryRun = in.readBoolean();
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeBoolean(dryRun);
+        }
+
+        public Request() {}
+
+        public boolean dryRun() {
+            return dryRun;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            DeleteDataStreamGlobalRetentionAction.Request request = (DeleteDataStreamGlobalRetentionAction.Request) o;
+            return dryRun == request.dryRun;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dryRun);
+        }
+
+        public void dryRun(boolean dryRun) {
+            this.dryRun = dryRun;
+        }
+    }
+
+    public static class TransportDeleteDataStreamGlobalRetentionAction extends TransportMasterNodeAction<
+        Request,
+        UpdateDataStreamGlobalRetentionResponse> {
+
+        private final UpdateDataStreamGlobalRetentionService globalRetentionService;
+        private final FeatureService featureService;
+
+        @Inject
+        public TransportDeleteDataStreamGlobalRetentionAction(
+            TransportService transportService,
+            ClusterService clusterService,
+            ThreadPool threadPool,
+            ActionFilters actionFilters,
+            IndexNameExpressionResolver indexNameExpressionResolver,
+            UpdateDataStreamGlobalRetentionService globalRetentionService,
+            FeatureService featureService
+        ) {
+            super(
+                INSTANCE.name(),
+                transportService,
+                clusterService,
+                threadPool,
+                actionFilters,
+                Request::new,
+                indexNameExpressionResolver,
+                UpdateDataStreamGlobalRetentionResponse::new,
+                threadPool.executor(ThreadPool.Names.MANAGEMENT)
+            );
+            this.globalRetentionService = globalRetentionService;
+            this.featureService = featureService;
+        }
+
+        @Override
+        protected void masterOperation(
+            Task task,
+            Request request,
+            ClusterState state,
+            ActionListener<UpdateDataStreamGlobalRetentionResponse> listener
+        ) throws Exception {
+            if (featureService.clusterHasFeature(state, DataStreamGlobalRetention.GLOBAL_RETENTION) == false) {
+                listener.onFailure(
+                    new ResourceNotFoundException(
+                        "Data stream global retention feature not found, please ensure all nodes have the feature "
+                            + DataStreamGlobalRetention.GLOBAL_RETENTION.id()
+                    )
+                );
+                return;
+            }
+            List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams = globalRetentionService
+                .determineAffectedDataStreams(null, state);
+            if (request.dryRun()) {
+                listener.onResponse(new UpdateDataStreamGlobalRetentionResponse(false, true, affectedDataStreams));
+            } else {
+                globalRetentionService.removeGlobalRetention(request, affectedDataStreams, listener);
+            }
+        }
+
+        @Override
+        protected ClusterBlockException checkBlock(Request request, ClusterState state) {
+            return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
+        }
+    }
+}

+ 168 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/GetDataStreamGlobalRetentionAction.java

@@ -0,0 +1,168 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams.lifecycle.action;
+
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.MasterNodeReadRequest;
+import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xcontent.ToXContentObject;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Retrieves the global retention for data streams.
+ */
+public class GetDataStreamGlobalRetentionAction {
+
+    public static final ActionType<Response> INSTANCE = new ActionType<>("cluster:monitor/data_stream/global_retention/get");
+
+    private GetDataStreamGlobalRetentionAction() {/* no instances */}
+
+    public static final class Request extends MasterNodeReadRequest<Request> {
+
+        public Request() {}
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return super.equals(obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }
+    }
+
+    public static class Response extends ActionResponse implements ToXContentObject {
+
+        private final DataStreamGlobalRetention globalRetention;
+
+        public Response(DataStreamGlobalRetention globalRetention) {
+            this.globalRetention = globalRetention;
+        }
+
+        public Response(StreamInput in) throws IOException {
+            super(in);
+            globalRetention = DataStreamGlobalRetention.read(in);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            globalRetention.writeTo(out);
+        }
+
+        @Override
+        public String toString() {
+            return "Response{" + "globalRetention=" + globalRetention + '}';
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            globalRetention.toXContentFragment(builder, params);
+            builder.endObject();
+            return builder;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Response that = (Response) o;
+            return Objects.equals(globalRetention, that.globalRetention);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(globalRetention);
+        }
+    }
+
+    public static class TransportGetDataStreamGlobalSettingsAction extends TransportMasterNodeReadAction<Request, Response> {
+
+        private final FeatureService featureService;
+
+        @Inject
+        public TransportGetDataStreamGlobalSettingsAction(
+            TransportService transportService,
+            ClusterService clusterService,
+            ThreadPool threadPool,
+            ActionFilters actionFilters,
+            IndexNameExpressionResolver indexNameExpressionResolver,
+            FeatureService featureService
+        ) {
+            super(
+                INSTANCE.name(),
+                transportService,
+                clusterService,
+                threadPool,
+                actionFilters,
+                Request::new,
+                indexNameExpressionResolver,
+                Response::new,
+                threadPool.executor(ThreadPool.Names.MANAGEMENT)
+            );
+            this.featureService = featureService;
+        }
+
+        @Override
+        protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
+            if (featureService.clusterHasFeature(state, DataStreamGlobalRetention.GLOBAL_RETENTION) == false) {
+                listener.onFailure(
+                    new ResourceNotFoundException(
+                        "Data stream global retention feature not found, please ensure all nodes have the feature "
+                            + DataStreamGlobalRetention.GLOBAL_RETENTION.id()
+                    )
+                );
+                return;
+            }
+            DataStreamGlobalRetention globalRetention = DataStreamGlobalRetention.getFromClusterState(state);
+            listener.onResponse(new Response(globalRetention == null ? DataStreamGlobalRetention.EMPTY : globalRetention));
+        }
+
+        @Override
+        protected ClusterBlockException checkBlock(Request request, ClusterState state) {
+            return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
+        }
+    }
+}

+ 202 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamGlobalRetentionAction.java

@@ -0,0 +1,202 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams.lifecycle.action;
+
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.ValidateActions;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.MasterNodeRequest;
+import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.datastreams.lifecycle.UpdateDataStreamGlobalRetentionService;
+import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xcontent.ConstructingObjectParser;
+import org.elasticsearch.xcontent.ObjectParser;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Sets the global retention for data streams (if it's not a dry run) and it returns the affected data streams.
+ */
+public class PutDataStreamGlobalRetentionAction {
+
+    public static final ActionType<UpdateDataStreamGlobalRetentionResponse> INSTANCE = new ActionType<>(
+        "cluster:admin/data_stream/global_retention/put"
+    );
+
+    private PutDataStreamGlobalRetentionAction() {/* no instances */}
+
+    public static final class Request extends MasterNodeRequest<Request> {
+
+        public static final ConstructingObjectParser<PutDataStreamGlobalRetentionAction.Request, Void> PARSER =
+            new ConstructingObjectParser<>(
+                "put_data_stream_global_retention_request",
+                args -> new PutDataStreamGlobalRetentionAction.Request((TimeValue) args[0], (TimeValue) args[1])
+            );
+
+        static {
+            PARSER.declareField(
+                ConstructingObjectParser.optionalConstructorArg(),
+                (p, c) -> TimeValue.parseTimeValue(p.textOrNull(), DataStreamGlobalRetention.DEFAULT_RETENTION_FIELD.getPreferredName()),
+                DataStreamGlobalRetention.DEFAULT_RETENTION_FIELD,
+                ObjectParser.ValueType.STRING_OR_NULL
+            );
+            PARSER.declareField(
+                ConstructingObjectParser.optionalConstructorArg(),
+                (p, c) -> TimeValue.parseTimeValue(p.textOrNull(), DataStreamGlobalRetention.MAX_RETENTION_FIELD.getPreferredName()),
+                DataStreamGlobalRetention.MAX_RETENTION_FIELD,
+                ObjectParser.ValueType.STRING_OR_NULL
+            );
+        }
+
+        private final DataStreamGlobalRetention globalRetention;
+        private boolean dryRun = false;
+
+        public static PutDataStreamGlobalRetentionAction.Request parseRequest(XContentParser parser) {
+            return PARSER.apply(parser, null);
+        }
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            globalRetention = DataStreamGlobalRetention.read(in);
+            dryRun = in.readBoolean();
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            ActionRequestValidationException validationException = null;
+            if (globalRetention.equals(DataStreamGlobalRetention.EMPTY)) {
+                return ValidateActions.addValidationError(
+                    "At least one of 'default_retention' or 'max_retention' should be defined."
+                        + " If you want to remove the configuration please use the DELETE method",
+                    validationException
+                );
+            }
+            return validationException;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            globalRetention.writeTo(out);
+            out.writeBoolean(dryRun);
+        }
+
+        public Request(@Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) {
+            this.globalRetention = new DataStreamGlobalRetention(defaultRetention, maxRetention);
+        }
+
+        public DataStreamGlobalRetention getGlobalRetention() {
+            return globalRetention;
+        }
+
+        public boolean dryRun() {
+            return dryRun;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            PutDataStreamGlobalRetentionAction.Request request = (PutDataStreamGlobalRetentionAction.Request) o;
+            return Objects.equals(globalRetention, request.globalRetention) && dryRun == request.dryRun;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(globalRetention, dryRun);
+        }
+
+        public void dryRun(boolean dryRun) {
+            this.dryRun = dryRun;
+        }
+    }
+
+    public static class TransportPutDataStreamGlobalRetentionAction extends TransportMasterNodeAction<
+        Request,
+        UpdateDataStreamGlobalRetentionResponse> {
+
+        private final UpdateDataStreamGlobalRetentionService globalRetentionService;
+        private final FeatureService featureService;
+
+        @Inject
+        public TransportPutDataStreamGlobalRetentionAction(
+            TransportService transportService,
+            ClusterService clusterService,
+            ThreadPool threadPool,
+            ActionFilters actionFilters,
+            IndexNameExpressionResolver indexNameExpressionResolver,
+            UpdateDataStreamGlobalRetentionService globalRetentionService,
+            FeatureService featureService
+        ) {
+            super(
+                INSTANCE.name(),
+                transportService,
+                clusterService,
+                threadPool,
+                actionFilters,
+                Request::new,
+                indexNameExpressionResolver,
+                UpdateDataStreamGlobalRetentionResponse::new,
+                threadPool.executor(ThreadPool.Names.MANAGEMENT)
+            );
+            this.globalRetentionService = globalRetentionService;
+            this.featureService = featureService;
+        }
+
+        @Override
+        protected void masterOperation(
+            Task task,
+            Request request,
+            ClusterState state,
+            ActionListener<UpdateDataStreamGlobalRetentionResponse> listener
+        ) throws Exception {
+            if (featureService.clusterHasFeature(state, DataStreamGlobalRetention.GLOBAL_RETENTION) == false) {
+                listener.onFailure(
+                    new ResourceNotFoundException(
+                        "Data stream global retention feature not found, please ensure all nodes have the feature "
+                            + DataStreamGlobalRetention.GLOBAL_RETENTION.id()
+                    )
+                );
+                return;
+            }
+            List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams = globalRetentionService
+                .determineAffectedDataStreams(request.globalRetention, state);
+            if (request.dryRun()) {
+                listener.onResponse(new UpdateDataStreamGlobalRetentionResponse(false, true, affectedDataStreams));
+            } else {
+                globalRetentionService.updateGlobalRetention(request, affectedDataStreams, listener);
+            }
+        }
+
+        @Override
+        protected ClusterBlockException checkBlock(Request request, ClusterState state) {
+            return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
+        }
+    }
+}

+ 122 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/UpdateDataStreamGlobalRetentionResponse.java

@@ -0,0 +1,122 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams.lifecycle.action;
+
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
+import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.ToXContentObject;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This response is used by {@link PutDataStreamGlobalRetentionAction} and {@link DeleteDataStreamGlobalRetentionAction} to
+ * communicate to the user the result of a global retention update and the affected data streams.
+ */
+public final class UpdateDataStreamGlobalRetentionResponse extends ActionResponse implements ChunkedToXContentObject {
+
+    public static final UpdateDataStreamGlobalRetentionResponse FAILED = new UpdateDataStreamGlobalRetentionResponse(
+        false,
+        false,
+        List.of()
+    );
+
+    private final boolean acknowledged;
+    private final boolean dryRun;
+    private final List<AffectedDataStream> affectedDataStreams;
+
+    public UpdateDataStreamGlobalRetentionResponse(StreamInput in) throws IOException {
+        super(in);
+        acknowledged = in.readBoolean();
+        dryRun = in.readBoolean();
+        affectedDataStreams = in.readCollectionAsImmutableList(AffectedDataStream::read);
+    }
+
+    public UpdateDataStreamGlobalRetentionResponse(boolean acknowledged, List<AffectedDataStream> affectedDataStreams) {
+        this(acknowledged, false, affectedDataStreams);
+    }
+
+    public UpdateDataStreamGlobalRetentionResponse(boolean acknowledged, boolean dryRun, List<AffectedDataStream> affectedDataStreams) {
+        this.acknowledged = acknowledged;
+        this.dryRun = dryRun;
+        this.affectedDataStreams = affectedDataStreams;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeBoolean(acknowledged);
+        out.writeBoolean(dryRun);
+        out.writeCollection(affectedDataStreams);
+    }
+
+    @Override
+    public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
+        return Iterators.concat(ChunkedToXContentHelper.startObject(), Iterators.single(((builder, params1) -> {
+            builder.field("acknowledged", acknowledged);
+            builder.field("dry_run", dryRun);
+            return builder;
+        })),
+            ChunkedToXContentHelper.startArray("affected_data_streams"),
+            Iterators.map(affectedDataStreams.iterator(), affectedDataStream -> affectedDataStream::toXContent),
+            ChunkedToXContentHelper.endArray(),
+            ChunkedToXContentHelper.endObject()
+        );
+    }
+
+    public record AffectedDataStream(String dataStreamName, TimeValue newEffectiveRetention, TimeValue previousEffectiveRetention)
+        implements
+            Writeable,
+            ToXContentObject {
+
+        public static AffectedDataStream read(StreamInput in) throws IOException {
+            return new AffectedDataStream(in.readString(), in.readOptionalTimeValue(), in.readOptionalTimeValue());
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeString(dataStreamName);
+            out.writeOptionalTimeValue(newEffectiveRetention);
+            out.writeOptionalTimeValue(previousEffectiveRetention);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.field("name", dataStreamName);
+            builder.field("new_effective_retention", newEffectiveRetention == null ? "infinite" : newEffectiveRetention.getStringRep());
+            builder.field(
+                "previous_effective_retention",
+                previousEffectiveRetention == null ? "infinite" : previousEffectiveRetention.getStringRep()
+            );
+            builder.endObject();
+            return builder;
+        }
+    }
+
+    public boolean isAcknowledged() {
+        return acknowledged;
+    }
+
+    public boolean isDryRun() {
+        return dryRun;
+    }
+
+    public List<AffectedDataStream> getAffectedDataStreams() {
+        return affectedDataStreams;
+    }
+}

+ 49 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestDeleteDataStreamGlobalRetentionAction.java

@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.datastreams.lifecycle.rest;
+
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamGlobalRetentionAction;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestChunkedToXContentListener;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.DELETE;
+
+/**
+ * Removes the data stream global retention configuration
+ */
+@ServerlessScope(Scope.PUBLIC)
+public class RestDeleteDataStreamGlobalRetentionAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "delete_data_stream_global_retention_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(DELETE, "/_data_stream/_global_retention"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
+        DeleteDataStreamGlobalRetentionAction.Request request = new DeleteDataStreamGlobalRetentionAction.Request();
+        request.dryRun(restRequest.paramAsBoolean("dry_run", false));
+        return channel -> client.execute(
+            DeleteDataStreamGlobalRetentionAction.INSTANCE,
+            request,
+            new RestChunkedToXContentListener<>(channel)
+        );
+    }
+}

+ 47 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestGetDataStreamGlobalRetentionAction.java

@@ -0,0 +1,47 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.datastreams.lifecycle.rest;
+
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamGlobalRetentionAction;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestToXContentListener;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+
+/**
+ * Retrieves the data stream global retention configuration.
+ */
+@ServerlessScope(Scope.PUBLIC)
+public class RestGetDataStreamGlobalRetentionAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "get_data_stream_global_retention_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(GET, "/_data_stream/_global_retention"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
+        GetDataStreamGlobalRetentionAction.Request request = new GetDataStreamGlobalRetentionAction.Request();
+        request.local(restRequest.paramAsBoolean("local", request.local()));
+        request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
+
+        return channel -> client.execute(GetDataStreamGlobalRetentionAction.INSTANCE, request, new RestToXContentListener<>(channel));
+    }
+}

+ 53 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestPutDataStreamGlobalRetentionAction.java

@@ -0,0 +1,53 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.datastreams.lifecycle.rest;
+
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamGlobalRetentionAction;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestChunkedToXContentListener;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.PUT;
+
+/**
+ * Updates the default_retention and the max_retention of the data stream global retention configuration. It
+ * does not accept an empty payload.
+ */
+@ServerlessScope(Scope.PUBLIC)
+public class RestPutDataStreamGlobalRetentionAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "put_data_stream_global_retention_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(PUT, "/_data_stream/_global_retention"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
+        try (XContentParser parser = restRequest.contentParser()) {
+            PutDataStreamGlobalRetentionAction.Request request = PutDataStreamGlobalRetentionAction.Request.parseRequest(parser);
+            request.dryRun(restRequest.paramAsBoolean("dry_run", false));
+            return channel -> client.execute(
+                PutDataStreamGlobalRetentionAction.INSTANCE,
+                request,
+                new RestChunkedToXContentListener<>(channel)
+            );
+        }
+    }
+}

+ 200 - 0
modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionServiceTests.java

@@ -0,0 +1,200 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams.lifecycle;
+
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
+import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
+import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.test.ClusterServiceUtils;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+
+public class UpdateDataStreamGlobalRetentionServiceTests extends ESTestCase {
+    private static TestThreadPool threadPool;
+    private ClusterService clusterService;
+    private UpdateDataStreamGlobalRetentionService service;
+
+    @BeforeClass
+    public static void setupThreadPool() {
+        threadPool = new TestThreadPool(getTestClass().getName());
+    }
+
+    @Before
+    public void setupServices() {
+        clusterService = ClusterServiceUtils.createClusterService(threadPool);
+        service = new UpdateDataStreamGlobalRetentionService(clusterService);
+    }
+
+    @After
+    public void closeClusterService() {
+        clusterService.close();
+    }
+
+    @AfterClass
+    public static void tearDownThreadPool() {
+        ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
+        threadPool = null;
+    }
+
+    public void testUpdateClusterState() {
+        // Removing from a cluster state without global retention
+        {
+            assertThat(service.updateGlobalRetention(ClusterState.EMPTY_STATE, null), equalTo(ClusterState.EMPTY_STATE));
+            assertThat(
+                service.updateGlobalRetention(ClusterState.EMPTY_STATE, DataStreamGlobalRetention.EMPTY),
+                equalTo(ClusterState.EMPTY_STATE)
+            );
+        }
+
+        // Removing from a cluster state with global retention
+        {
+            ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+                .putCustom(DataStreamGlobalRetention.TYPE, randomNonEmptyGlobalRetention())
+                .build();
+            DataStreamGlobalRetention updatedRetention = DataStreamGlobalRetention.getFromClusterState(
+                service.updateGlobalRetention(clusterState, null)
+            );
+            assertThat(updatedRetention, nullValue());
+            updatedRetention = DataStreamGlobalRetention.getFromClusterState(
+                service.updateGlobalRetention(clusterState, DataStreamGlobalRetention.EMPTY)
+            );
+            assertThat(updatedRetention, nullValue());
+        }
+
+        // Updating retention
+        {
+            var initialRetention = randomNonEmptyGlobalRetention();
+            ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+                .putCustom(DataStreamGlobalRetention.TYPE, initialRetention)
+                .build();
+            var expectedRetention = randomValueOtherThan(
+                initialRetention,
+                UpdateDataStreamGlobalRetentionServiceTests::randomNonEmptyGlobalRetention
+            );
+            var updatedRetention = DataStreamGlobalRetention.getFromClusterState(
+                service.updateGlobalRetention(clusterState, expectedRetention)
+            );
+            assertThat(updatedRetention, equalTo(expectedRetention));
+        }
+    }
+
+    public void testDetermineAffectedDataStreams() {
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStreamWithoutLifecycle = DataStreamTestHelper.newInstance(
+            "ds-no-lifecycle",
+            List.of(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10))),
+            1,
+            null,
+            false,
+            null,
+            List.of()
+        );
+        builder.put(dataStreamWithoutLifecycle);
+        String dataStreamNoRetention = "ds-no-retention";
+        DataStream dataStreamWithLifecycleNoRetention = DataStreamTestHelper.newInstance(
+            dataStreamNoRetention,
+            List.of(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10))),
+            1,
+            null,
+            false,
+            DataStreamLifecycle.DEFAULT,
+            List.of()
+        );
+
+        builder.put(dataStreamWithLifecycleNoRetention);
+        DataStream dataStreamWithLifecycleShortRetention = DataStreamTestHelper.newInstance(
+            "ds-no-short-retention",
+            List.of(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10))),
+            1,
+            null,
+            false,
+            DataStreamLifecycle.newBuilder().dataRetention(TimeValue.timeValueDays(7)).build(),
+            List.of()
+        );
+        builder.put(dataStreamWithLifecycleShortRetention);
+        String dataStreamLongRetention = "ds-long-retention";
+        DataStream dataStreamWithLifecycleLongRetention = DataStreamTestHelper.newInstance(
+            dataStreamLongRetention,
+            List.of(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10))),
+            1,
+            null,
+            false,
+            DataStreamLifecycle.newBuilder().dataRetention(TimeValue.timeValueDays(365)).build(),
+            List.of()
+        );
+        builder.put(dataStreamWithLifecycleLongRetention);
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
+        // No global retention
+        {
+            var affectedDataStreams = service.determineAffectedDataStreams(null, clusterState);
+            assertThat(affectedDataStreams.isEmpty(), is(true));
+        }
+        // No difference in global retention
+        {
+            var globalRetention = randomNonEmptyGlobalRetention();
+            var clusterStateWithRetention = ClusterState.builder(clusterState)
+                .putCustom(DataStreamGlobalRetention.TYPE, globalRetention)
+                .build();
+            var affectedDataStreams = service.determineAffectedDataStreams(globalRetention, clusterStateWithRetention);
+            assertThat(affectedDataStreams.isEmpty(), is(true));
+        }
+        // Default retention in effect
+        {
+            var globalRetention = new DataStreamGlobalRetention(TimeValue.timeValueDays(randomIntBetween(1, 10)), null);
+            var affectedDataStreams = service.determineAffectedDataStreams(globalRetention, clusterState);
+            assertThat(affectedDataStreams.size(), is(1));
+            var dataStream = affectedDataStreams.get(0);
+            assertThat(dataStream.dataStreamName(), equalTo(dataStreamNoRetention));
+            assertThat(dataStream.previousEffectiveRetention(), nullValue());
+            assertThat(dataStream.newEffectiveRetention(), equalTo(globalRetention.getDefaultRetention()));
+        }
+        // Max retention in effect
+        {
+            var globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueDays(randomIntBetween(10, 90)));
+            var affectedDataStreams = service.determineAffectedDataStreams(globalRetention, clusterState);
+            assertThat(affectedDataStreams.size(), is(2));
+            var dataStream = affectedDataStreams.get(0);
+            assertThat(dataStream.dataStreamName(), equalTo(dataStreamLongRetention));
+            assertThat(dataStream.previousEffectiveRetention(), notNullValue());
+            assertThat(dataStream.newEffectiveRetention(), equalTo(globalRetention.getMaxRetention()));
+            dataStream = affectedDataStreams.get(1);
+            assertThat(dataStream.dataStreamName(), equalTo(dataStreamNoRetention));
+            assertThat(dataStream.previousEffectiveRetention(), nullValue());
+            assertThat(dataStream.newEffectiveRetention(), equalTo(globalRetention.getMaxRetention()));
+        }
+    }
+
+    private static DataStreamGlobalRetention randomNonEmptyGlobalRetention() {
+        boolean withDefault = randomBoolean();
+        return new DataStreamGlobalRetention(
+            withDefault ? TimeValue.timeValueDays(randomIntBetween(1, 1000)) : null,
+            withDefault == false || randomBoolean() ? TimeValue.timeValueDays(randomIntBetween(1000, 2000)) : null
+        );
+    }
+}

+ 8 - 4
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml

@@ -1,8 +1,8 @@
 ---
 "Explain backing index lifecycle":
   - skip:
-      version: " - 8.10.99"
-      reason: "Data stream lifecycle was released as tech preview in 8.11"
+      cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"]
+      reason: "Data stream lifecycle with effective retention was released in 8.14"
       features: allowed_warnings
   - do:
       allowed_warnings:
@@ -36,7 +36,9 @@
       indices.explain_data_lifecycle:
         index: $backing_index
   - match: { indices.$backing_index.managed_by_lifecycle: true }
-  - match: { indices.$backing_index.lifecycle.data_retention: '30d' }
+  - match: { indices.$backing_index.lifecycle.data_retention: "30d" }
+  - match: { indices.$backing_index.lifecycle.effective_retention: "30d"}
+  - match: { indices.$backing_index.lifecycle.retention_determined_by: "data_stream_configuration"}
   - match: { indices.$backing_index.lifecycle.enabled: true }
   - is_false: indices.$backing_index.lifecycle.rollover
 
@@ -46,7 +48,9 @@
         index: $backing_index
         include_defaults: true
   - match: { indices.$backing_index.managed_by_lifecycle: true }
-  - match: { indices.$backing_index.lifecycle.data_retention: '30d' }
+  - match: { indices.$backing_index.lifecycle.data_retention: "30d" }
+  - match: { indices.$backing_index.lifecycle.effective_retention: "30d"}
+  - match: { indices.$backing_index.lifecycle.retention_determined_by: "data_stream_configuration"}
   - is_true: indices.$backing_index.lifecycle.rollover
 
 

+ 4 - 2
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/190_create_data_stream_with_lifecycle.yml

@@ -1,8 +1,8 @@
 ---
 "Create data stream with lifecycle":
   - skip:
-      version: " - 8.10.99"
-      reason: "Data stream lifecycle was GA in 8.11"
+      version: " - 8.13.99"
+      reason: "Data stream lifecycle with effective retention was released in 8.14"
       features: allowed_warnings
   - do:
       allowed_warnings:
@@ -35,5 +35,7 @@
   - match: { data_streams.0.template: 'template-with-lifecycle' }
   - match: { data_streams.0.hidden: false }
   - match: { data_streams.0.lifecycle.data_retention: '30d' }
+  - match: { data_streams.0.lifecycle.effective_retention: '30d'}
+  - match: { data_streams.0.lifecycle.retention_determined_by: 'data_stream_configuration'}
   - match: { data_streams.0.lifecycle.enabled: true }
   - is_true: data_streams.0.lifecycle.rollover

+ 18 - 7
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/20_basic.yml

@@ -1,8 +1,8 @@
 setup:
   - skip:
       features: allowed_warnings
-      version: " - 8.10.99"
-      reason: "Data stream lifecycles only supported in 8.11+"
+      cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"]
+      reason: "Data stream lifecycles with global retention are only supported in 8.14+"
   - do:
       allowed_warnings:
         - "index template [my-lifecycle] has index patterns [data-stream-with-lifecycle] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-lifecycle] will take precedence during new index creation"
@@ -47,6 +47,8 @@ setup:
   - length: { data_streams: 1}
   - match: { data_streams.0.name: data-stream-with-lifecycle }
   - match: { data_streams.0.lifecycle.data_retention: '10d' }
+  - match: { data_streams.0.lifecycle.effective_retention: '10d' }
+  - match: { data_streams.0.lifecycle.retention_determined_by: 'data_stream_configuration' }
   - match: { data_streams.0.lifecycle.enabled: true}
 
 ---
@@ -61,6 +63,7 @@ setup:
   - length: { data_streams: 1}
   - match: { data_streams.0.name: simple-data-stream1 }
   - match: { data_streams.0.lifecycle.enabled: true}
+  - is_false: data_streams.0.lifecycle.effective_retention
 
 ---
 "Put data stream lifecycle":
@@ -91,14 +94,16 @@ setup:
         name: "*"
   - length: { data_streams: 2 }
   - match: { data_streams.0.name: data-stream-with-lifecycle }
-  - match: { data_streams.0.lifecycle.data_retention: '30d' }
+  - match: { data_streams.0.lifecycle.data_retention: "30d" }
+  - is_false: data_streams.0.lifecycle.effective_retention
   - match: { data_streams.0.lifecycle.enabled: false}
   - match: { data_streams.0.lifecycle.downsampling.0.after: '10d'}
   - match: { data_streams.0.lifecycle.downsampling.0.fixed_interval: '1h'}
   - match: { data_streams.0.lifecycle.downsampling.1.after: '100d'}
   - match: { data_streams.0.lifecycle.downsampling.1.fixed_interval: '10h'}
   - match: { data_streams.1.name: simple-data-stream1 }
-  - match: { data_streams.1.lifecycle.data_retention: '30d' }
+  - match: { data_streams.1.lifecycle.data_retention: "30d" }
+  - is_false: data_streams.0.lifecycle.effective_retention
   - match: { data_streams.1.lifecycle.enabled: false}
   - match: { data_streams.1.lifecycle.downsampling.0.after: '10d'}
   - match: { data_streams.1.lifecycle.downsampling.0.fixed_interval: '1h'}
@@ -123,7 +128,9 @@ setup:
   - match: { data_streams.0.lifecycle.data_retention: '30d' }
   - match: { data_streams.0.lifecycle.enabled: true}
   - match: { data_streams.1.name: simple-data-stream1 }
-  - match: { data_streams.1.lifecycle.data_retention: '30d' }
+  - match: { data_streams.1.lifecycle.data_retention: "30d" }
+  - match: { data_streams.1.lifecycle.effective_retention: "30d"}
+  - match: { data_streams.1.lifecycle.retention_determined_by: "data_stream_configuration"}
   - match: { data_streams.1.lifecycle.enabled: true}
 
 
@@ -136,7 +143,9 @@ setup:
         include_defaults: true
   - length: { data_streams: 1}
   - match: { data_streams.0.name: data-stream-with-lifecycle }
-  - match: { data_streams.0.lifecycle.data_retention: '10d' }
+  - match: { data_streams.0.lifecycle.data_retention: "10d" }
+  - match: { data_streams.0.lifecycle.effective_retention: "10d"}
+  - match: { data_streams.0.lifecycle.retention_determined_by: "data_stream_configuration"}
   - is_true: data_streams.0.lifecycle.rollover
 
 ---
@@ -153,7 +162,9 @@ setup:
         name: "simple-data-stream1"
   - length: { data_streams: 1 }
   - match: { data_streams.0.name: simple-data-stream1 }
-  - match: { data_streams.0.lifecycle.data_retention: '30d' }
+  - match: { data_streams.0.lifecycle.data_retention: "30d" }
+  - match: { data_streams.0.lifecycle.effective_retention: "30d"}
+  - match: { data_streams.0.lifecycle.retention_determined_by: "data_stream_configuration"}
   - match: { data_streams.0.lifecycle.enabled: true }
 
   - do:

+ 19 - 6
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/30_not_found.yml

@@ -1,7 +1,7 @@
 setup:
   - skip:
       features: allowed_warnings
-      version: " - 8.10.99"
+      cluster_features: ["datastream_lifecycle"]
       reason: "Data stream lifecycle was GA in 8.11"
   - do:
       allowed_warnings:
@@ -23,13 +23,18 @@ setup:
 
 ---
 "Get data stream lifecycle":
+  - skip:
+      cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"]
+      reason: "Data stream lifecycle with effective retention was released in 8.14"
 
   - do:
       indices.get_data_lifecycle:
         name: "*"
   - length: { data_streams: 1}
   - match: { data_streams.0.name: my-data-stream-1 }
-  - match: { data_streams.0.lifecycle.data_retention: '10d' }
+  - match: { data_streams.0.lifecycle.data_retention: "10d" }
+  - match: { data_streams.0.lifecycle.effective_retention: "10d"}
+  - match: { data_streams.0.lifecycle.retention_determined_by: "data_stream_configuration"}
   - match: { data_streams.0.lifecycle.enabled: true}
 
 ---
@@ -43,7 +48,9 @@ setup:
 
 ---
 "Put data stream lifecycle does not succeed when at lease one data stream does not exist":
-
+  - skip:
+      cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"]
+      reason: "Data stream lifecycle with effective retention was released in 8.14"
   - do:
       catch:  missing
       indices.put_data_lifecycle:
@@ -57,12 +64,16 @@ setup:
         name: "*"
   - length: { data_streams: 1 }
   - match: { data_streams.0.name: my-data-stream-1 }
-  - match: { data_streams.0.lifecycle.data_retention: '10d' }
+  - match: { data_streams.0.lifecycle.data_retention: "10d" }
+  - match: { data_streams.0.lifecycle.effective_retention: "10d"}
+  - match: { data_streams.0.lifecycle.retention_determined_by: "data_stream_configuration"}
   - match: { data_streams.0.lifecycle.enabled: true }
 
 ---
 "Delete data stream lifecycle does not succeed when at lease one data stream does not exist":
-
+  - skip:
+      cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"]
+      reason: "Data stream lifecycle with effective retention was released in 8.14"
   - do:
       catch:  missing
       indices.delete_data_lifecycle:
@@ -74,5 +85,7 @@ setup:
         name: "*"
   - length: { data_streams: 1 }
   - match: { data_streams.0.name: my-data-stream-1 }
-  - match: { data_streams.0.lifecycle.data_retention: '10d' }
+  - match: { data_streams.0.lifecycle.data_retention: "10d" }
+  - match: { data_streams.0.lifecycle.effective_retention: "10d"}
+  - match: { data_streams.0.lifecycle.retention_determined_by: "data_stream_configuration"}
   - match: { data_streams.0.lifecycle.enabled: true }

+ 139 - 0
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/40_global_retention.yml

@@ -0,0 +1,139 @@
+setup:
+  - skip:
+      features: allowed_warnings
+      cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"]
+      reason: "Global retention was added in 8.14"
+  - do:
+      allowed_warnings:
+        - "index template [my-lifecycle] has index patterns [my-data-stream-1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-lifecycle] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-lifecycle
+        body:
+          index_patterns: [my-data-stream-*]
+          template:
+            settings:
+              index.number_of_replicas: 0
+            lifecycle: {}
+          data_stream: {}
+
+---
+"CRUD global retention":
+  - do:
+      indices.create_data_stream:
+        name: my-data-stream-1
+  - do:
+      cluster.health:
+        index: my-data-stream-1
+        wait_for_status: green
+  - do:
+      data_streams.put_global_retention:
+        body:
+          default_retention: "7d"
+          max_retention: "90d"
+  - is_true: acknowledged
+  - is_false: dry_run
+  - match: {affected_data_streams.0.name: "my-data-stream-1"}
+  - match: {affected_data_streams.0.previous_effective_retention: "infinite"}
+  - match: {affected_data_streams.0.new_effective_retention: "7d"}
+
+  - do:
+      data_streams.get_global_retention: { }
+  - match: { default_retention: "7d" }
+  - match: { max_retention: "90d" }
+
+  - do:
+      data_streams.delete_global_retention: { }
+  - is_true: acknowledged
+  - is_false: dry_run
+  - match: { affected_data_streams.0.name: "my-data-stream-1" }
+  - match: { affected_data_streams.0.previous_effective_retention: "7d" }
+  - match: { affected_data_streams.0.new_effective_retention: "infinite" }
+
+  - do:
+      data_streams.get_global_retention: { }
+  - is_false: default_retention
+  - is_false: max_retention
+
+  - do:
+      indices.delete_data_stream:
+        name: my-data-stream-1
+---
+"Dry run global retention":
+  - do:
+      indices.create_data_stream:
+        name: my-data-stream-2
+  - do:
+      indices.put_data_lifecycle:
+        name: "my-data-stream-2"
+        body: >
+          {
+            "data_retention": "90d"
+          }
+  - is_true: acknowledged
+
+  - do:
+      data_streams.put_global_retention:
+        dry_run: true
+        body:
+          default_retention: "7d"
+          max_retention: "30d"
+  - is_false: acknowledged
+  - is_true: dry_run
+  - match: {affected_data_streams.0.name: "my-data-stream-2"}
+  - match: {affected_data_streams.0.previous_effective_retention: "90d"}
+  - match: {affected_data_streams.0.new_effective_retention: "30d"}
+
+  - do:
+      indices.get_data_stream:
+        name: "my-data-stream-2"
+        include_defaults: true
+  - match: { data_streams.0.name: my-data-stream-2 }
+  - match: { data_streams.0.lifecycle.effective_retention: '90d' }
+  - match: { data_streams.0.lifecycle.retention_determined_by: 'data_stream_configuration' }
+  - do:
+      indices.delete_data_stream:
+        name: my-data-stream-2
+---
+"Default global retention is retrieved by data stream and index templates":
+  - do:
+      indices.create_data_stream:
+        name: my-data-stream-3
+
+  - do:
+      data_streams.put_global_retention:
+        body:
+          default_retention: "7d"
+          max_retention: "90d"
+  - is_true: acknowledged
+  - is_false: dry_run
+  - match: {affected_data_streams.0.name: "my-data-stream-3"}
+  - match: {affected_data_streams.0.previous_effective_retention: "infinite"}
+  - match: {affected_data_streams.0.new_effective_retention: "7d"}
+
+  - do:
+      data_streams.get_global_retention: { }
+  - match: { default_retention: "7d" }
+  - match: { max_retention: "90d" }
+
+  - do:
+      indices.get_data_stream:
+        name: "my-data-stream-3"
+  - match: { data_streams.0.name: my-data-stream-3 }
+  - match: { data_streams.0.lifecycle.effective_retention: '7d' }
+  - match: { data_streams.0.lifecycle.retention_determined_by: 'default_global_retention' }
+  - match: { data_streams.0.lifecycle.enabled: true }
+
+  - do:
+      indices.get_index_template:
+        name: my-lifecycle
+
+  - match: { index_templates.0.name: my-lifecycle }
+  - match: { index_templates.0.index_template.template.lifecycle.enabled: true }
+  - match: { index_templates.0.index_template.template.lifecycle.effective_retention: "7d" }
+  - match: { index_templates.0.index_template.template.lifecycle.retention_determined_by: "default_global_retention" }
+
+  - do:
+      data_streams.delete_global_retention: { }
+  - do:
+      indices.delete_data_stream:
+        name: my-data-stream-3

+ 35 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.delete_global_retention.json

@@ -0,0 +1,35 @@
+{
+  "data_streams.delete_global_retention":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams-delete-global-retention.html",
+      "description":"Deletes the global retention configuration that applies to all data streams managed by the data stream lifecycle."
+    },
+    "stability":"experimental",
+    "visibility":"public",
+    "headers":{
+      "accept": [ "application/json"],
+      "content_type": ["application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_data_stream/_global_retention",
+          "methods":[
+            "DELETE"
+          ]
+        }
+      ]
+    },
+    "params":{
+      "dry_run":{
+        "type":"boolean",
+        "description":"Determines whether the global retention provided should be applied or only the impact should be determined.",
+        "default":false
+      },
+      "master_timeout":{
+        "type":"time",
+        "description":"Specify timeout for connection to master."
+      }
+    }
+  }
+}

+ 29 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.get_global_retention.json

@@ -0,0 +1,29 @@
+{
+  "data_streams.get_global_retention":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams-get-global-retention.html",
+      "description":"Returns global retention configuration that applies to all data streams managed by the data stream lifecycle."
+    },
+    "stability":"experimental",
+    "visibility":"public",
+    "headers":{
+      "accept": [ "application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_data_stream/_global_retention",
+          "methods":[
+            "GET"
+          ]
+        }
+      ]
+    },
+    "params":{
+      "local":{
+        "type":"boolean",
+        "description":"Return the global retention retrieved from the node that received the request."
+      }
+    }
+  }
+}

+ 39 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/data_streams.put_global_retention.json

@@ -0,0 +1,39 @@
+{
+  "data_streams.put_global_retention":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams-put-global-retention.html",
+      "description":"Updates the global retention configuration that applies to all data streams managed by the data stream lifecycle."
+    },
+    "stability":"experimental",
+    "visibility":"public",
+    "headers":{
+      "accept": [ "application/json"],
+      "content_type": ["application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_data_stream/_global_retention",
+          "methods":[
+            "PUT"
+          ]
+        }
+      ]
+    },
+    "params":{
+      "dry_run":{
+        "type":"boolean",
+        "description":"Determines whether the global retention provided should be applied or only the impact should be determined.",
+        "default":false
+      },
+      "master_timeout":{
+        "type":"time",
+        "description":"Specify timeout for connection to master"
+      }
+    },
+    "body":{
+      "description":"The global retention configuration including optional values for default and max retention.",
+      "required":true
+    }
+  }
+}

+ 8 - 4
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml

@@ -117,8 +117,8 @@
 ---
 "Add data stream lifecycle":
   - requires:
-      cluster_features: ["datastream_lifecycle"]
-      reason: "Data stream lifecycle was available from 8.11"
+      cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"]
+      reason: "Data stream lifecycle with global retention was available from 8.14"
 
   - do:
       cluster.put_component_template:
@@ -141,12 +141,14 @@
   - match: {component_templates.0.component_template.version: 1}
   - match: {component_templates.0.component_template.template.lifecycle.enabled: true}
   - match: {component_templates.0.component_template.template.lifecycle.data_retention: "10d"}
+  - match: {component_templates.0.component_template.template.lifecycle.effective_retention: "10d"}
+  - match: {component_templates.0.component_template.template.lifecycle.retention_determined_by: "data_stream_configuration"}
 
 ---
 "Get data stream lifecycle with default rollover":
   - requires:
-      cluster_features: ["datastream_lifecycle"]
-      reason: "Data stream lifecycle was available from 8.11"
+      cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"]
+      reason: "Data stream lifecycle with effective retention was available from 8.14"
 
   - do:
       cluster.put_component_template:
@@ -170,4 +172,6 @@
   - match: {component_templates.0.component_template.version: 1}
   - match: {component_templates.0.component_template.template.lifecycle.enabled: true}
   - match: {component_templates.0.component_template.template.lifecycle.data_retention: "10d"}
+  - match: {component_templates.0.component_template.template.lifecycle.effective_retention: "10d"}
+  - match: {component_templates.0.component_template.template.lifecycle.retention_determined_by: "data_stream_configuration"}
   - is_true: component_templates.0.component_template.template.lifecycle.rollover

+ 10 - 20
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_index_template/10_basic.yml

@@ -1,6 +1,6 @@
 setup:
   - skip:
-      version: " - 7.7.99"
+      cluster_features: [ "templates_v2" ]
       reason: "index template v2 API unavailable before 7.8"
       features: allowed_warnings
 
@@ -22,9 +22,6 @@ setup:
 
 ---
 "Get index template":
-  - skip:
-      version: " - 7.99.99"
-      reason: "index template v2 API has not been backported"
 
   - do:
       indices.get_index_template:
@@ -37,10 +34,6 @@ setup:
 
 ---
 "Get all index templates":
-  - skip:
-      version: " - 7.7.99"
-      reason: "index template v2 API unavailable before 7.8"
-      features: allowed_warnings
 
   - do:
       allowed_warnings:
@@ -61,10 +54,6 @@ setup:
 
 ---
 "Pattern matching in index templates":
-  - skip:
-      version: " - 7.7.99"
-      reason: "index template v2 API unavailable before 7.8"
-      features: allowed_warnings
 
   - do:
       allowed_warnings:
@@ -93,9 +82,6 @@ setup:
 
 ---
 "Get index template with local flag":
-  - skip:
-      version: " - 7.7.99"
-      reason: "index template v2 API unavailable before 7.8"
 
   - do:
       indices.get_index_template:
@@ -107,8 +93,8 @@ setup:
 ---
 "Add data stream lifecycle":
   - skip:
-      version: " - 8.10.99"
-      reason: "Data stream lifecycle in index templates was updated after 8.10"
+      cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"]
+      reason: "Data stream lifecycle with effective retention was released in 8.14"
       features: allowed_warnings
 
   - do:
@@ -138,12 +124,14 @@ setup:
   - match: {index_templates.0.index_template.template.mappings: {properties: {field: {type: keyword}}}}
   - match: {index_templates.0.index_template.template.lifecycle.enabled: true}
   - match: {index_templates.0.index_template.template.lifecycle.data_retention: "30d"}
+  - match: {index_templates.0.index_template.template.lifecycle.effective_retention: "30d"}
+  - match: {index_templates.0.index_template.template.lifecycle.retention_determined_by: "data_stream_configuration"}
 
 ---
 "Get data stream lifecycle with default rollover":
   - skip:
-      version: " - 8.10.99"
-      reason: "Data stream lifecycle in index templates was updated after 8.10"
+      cluster_features: ["datastream_lifecycle", "data_stream.lifecycle.global_retention"]
+      reason: "Data stream lifecycle with effective retention was released in 8.14"
       features: allowed_warnings
 
   - do:
@@ -166,12 +154,14 @@ setup:
   - match: {index_templates.0.index_template.index_patterns: ["data-stream-with-lifecycle-*"]}
   - match: {index_templates.0.index_template.template.lifecycle.enabled: true}
   - match: {index_templates.0.index_template.template.lifecycle.data_retention: "30d"}
+  - match: {index_templates.0.index_template.template.lifecycle.effective_retention: "30d"}
+  - match: {index_templates.0.index_template.template.lifecycle.retention_determined_by: "data_stream_configuration"}
   - is_true: index_templates.0.index_template.template.lifecycle.rollover
 
 ---
 "Reject data stream lifecycle without data stream configuration":
   - skip:
-      version: " - 8.10.99"
+      cluster_features: ["datastream_lifecycle"]
       reason: "Data stream lifecycle in index templates was updated after 8.10"
   - do:
       catch:  bad_request

+ 4 - 2
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_index_template/10_basic.yml

@@ -227,8 +227,8 @@
 ---
 "Simulate index template with lifecycle and include defaults":
   - skip:
-      version: " - 8.10.99"
-      reason: "Lifecycle is only available in 8.11+"
+      version: " - 8.13.99"
+      reason: "Data stream lifecycle with effective retention was released in 8.14"
       features: ["default_shards"]
 
   - do:
@@ -248,5 +248,7 @@
 
   - match: {template.lifecycle.enabled: true}
   - match: {template.lifecycle.data_retention: "7d"}
+  - match: {template.lifecycle.effective_retention: "7d"}
+  - match: {template.lifecycle.retention_determined_by: "data_stream_configuration"}
   - is_true: template.lifecycle.rollover
   - match: {overlapping: []}

+ 4 - 2
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.simulate_template/10_basic.yml

@@ -202,8 +202,8 @@
 ---
 "Simulate template with lifecycle and include defaults":
   - skip:
-      version: " - 8.10.99"
-      reason: "Lifecycle is only available in 8.11+"
+      version: " - 8.13.99"
+      reason: "Data stream lifecycle with effective retention was released in 8.14"
       features: ["default_shards"]
 
   - do:
@@ -223,4 +223,6 @@
 
   - match: {template.lifecycle.enabled: true}
   - match: {template.lifecycle.data_retention: "7d"}
+  - match: {template.lifecycle.effective_retention: "7d"}
+  - match: {template.lifecycle.retention_determined_by: "data_stream_configuration"}
   - is_true: template.lifecycle.rollover

+ 2 - 4
server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComponentTemplateAction.java

@@ -137,10 +137,8 @@ public class GetComponentTemplateAction extends ActionType<GetComponentTemplateA
             }
         }
 
-        public Response(Map<String, ComponentTemplate> componentTemplates) {
-            this.componentTemplates = componentTemplates;
-            this.rolloverConfiguration = null;
-            this.globalRetention = null;
+        public Response(Map<String, ComponentTemplate> componentTemplates, @Nullable DataStreamGlobalRetention globalRetention) {
+            this(componentTemplates, null, globalRetention);
         }
 
         public Response(

+ 2 - 4
server/src/main/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateAction.java

@@ -140,10 +140,8 @@ public class GetComposableIndexTemplateAction extends ActionType<GetComposableIn
             }
         }
 
-        public Response(Map<String, ComposableIndexTemplate> indexTemplates) {
-            this.indexTemplates = indexTemplates;
-            this.rolloverConfiguration = null;
-            this.globalRetention = null;
+        public Response(Map<String, ComposableIndexTemplate> indexTemplates, @Nullable DataStreamGlobalRetention globalRetention) {
+            this(indexTemplates, null, globalRetention);
         }
 
         public Response(

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java

@@ -102,7 +102,7 @@ public class TransportGetComponentTemplateAction extends TransportMasterNodeRead
                 )
             );
         } else {
-            listener.onResponse(new GetComponentTemplateAction.Response(results));
+            listener.onResponse(new GetComponentTemplateAction.Response(results, DataStreamGlobalRetention.getFromClusterState(state)));
         }
     }
 }

+ 3 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java

@@ -100,7 +100,9 @@ public class TransportGetComposableIndexTemplateAction extends TransportMasterNo
                 )
             );
         } else {
-            listener.onResponse(new GetComposableIndexTemplateAction.Response(results));
+            listener.onResponse(
+                new GetComposableIndexTemplateAction.Response(results, DataStreamGlobalRetention.getFromClusterState(state))
+            );
         }
     }
 }

+ 11 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java

@@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -35,10 +36,13 @@ public final class DataStreamGlobalRetention extends AbstractNamedDiffable<Clust
 
     public static final String TYPE = "data-stream-global-retention";
 
+    public static final NodeFeature GLOBAL_RETENTION = new NodeFeature("data_stream.lifecycle.global_retention");
+
     public static final ParseField DEFAULT_RETENTION_FIELD = new ParseField("default_retention");
     public static final ParseField MAX_RETENTION_FIELD = new ParseField("max_retention");
 
     public static final DataStreamGlobalRetention EMPTY = new DataStreamGlobalRetention(null, null);
+    public static final TimeValue MIN_RETENTION_VALUE = TimeValue.timeValueSeconds(10);
 
     @Nullable
     private final TimeValue defaultRetention;
@@ -60,10 +64,17 @@ public final class DataStreamGlobalRetention extends AbstractNamedDiffable<Clust
                     + "]."
             );
         }
+        if (validateRetentionValue(defaultRetention) == false || validateRetentionValue(maxRetention) == false) {
+            throw new IllegalArgumentException("Global retention values should be greater than " + MIN_RETENTION_VALUE.getStringRep());
+        }
         this.defaultRetention = defaultRetention;
         this.maxRetention = maxRetention;
     }
 
+    private boolean validateRetentionValue(@Nullable TimeValue retention) {
+        return retention == null || retention.getMillis() >= MIN_RETENTION_VALUE.getMillis();
+    }
+
     public static DataStreamGlobalRetention read(StreamInput in) throws IOException {
         return new DataStreamGlobalRetention(in.readOptionalTimeValue(), in.readOptionalTimeValue());
     }

+ 1 - 2
server/src/main/java/org/elasticsearch/rest/action/cat/RestTemplatesAction.java

@@ -25,7 +25,6 @@ import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestResponseListener;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -78,7 +77,7 @@ public class RestTemplatesAction extends AbstractCatAction {
                 getComposableTemplatesRequest,
                 getComposableTemplatesStep.delegateResponse((l, e) -> {
                     if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
-                        l.onResponse(new GetComposableIndexTemplateAction.Response(Collections.emptyMap()));
+                        l.onResponse(new GetComposableIndexTemplateAction.Response(Map.of(), null));
                     } else {
                         l.onFailure(e);
                     }

+ 5 - 3
server/src/test/java/org/elasticsearch/action/admin/indices/template/get/GetComposableIndexTemplateResponseTests.java

@@ -10,10 +10,11 @@ package org.elasticsearch.action.admin.indices.template.get;
 
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplateTests;
+import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
+import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionTests;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -25,14 +26,15 @@ public class GetComposableIndexTemplateResponseTests extends AbstractWireSeriali
 
     @Override
     protected GetComposableIndexTemplateAction.Response createTestInstance() {
+        DataStreamGlobalRetention globalRetention = randomBoolean() ? null : DataStreamGlobalRetentionTests.randomGlobalRetention();
         if (randomBoolean()) {
-            return new GetComposableIndexTemplateAction.Response(Collections.emptyMap());
+            return new GetComposableIndexTemplateAction.Response(Map.of(), globalRetention);
         }
         Map<String, ComposableIndexTemplate> templates = new HashMap<>();
         for (int i = 0; i < randomIntBetween(1, 4); i++) {
             templates.put(randomAlphaOfLength(4), ComposableIndexTemplateTests.randomInstance());
         }
-        return new GetComposableIndexTemplateAction.Response(templates);
+        return new GetComposableIndexTemplateAction.Response(templates, globalRetention);
     }
 
     @Override

+ 2 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionTests.java

@@ -94,5 +94,7 @@ public class DataStreamGlobalRetentionTests extends SimpleDiffableWireSerializat
                 TimeValue.timeValueDays(randomIntBetween(1, 1000))
             )
         );
+        expectThrows(IllegalArgumentException.class, () -> new DataStreamGlobalRetention(TimeValue.ZERO, null));
+        expectThrows(IllegalArgumentException.class, () -> new DataStreamGlobalRetention(null, TimeValue.ZERO));
     }
 }

+ 9 - 9
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -1128,10 +1128,10 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         long now = System.currentTimeMillis();
 
         List<DataStreamMetadata> creationAndRolloverTimes = List.of(
-            DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000),
-            DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000),
-            DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000),
-            DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000),
+            DataStreamMetadata.dataStreamMetadata(now - 5000_000, now - 4000_000),
+            DataStreamMetadata.dataStreamMetadata(now - 4000_000, now - 3000_000),
+            DataStreamMetadata.dataStreamMetadata(now - 3000_000, now - 2000_000),
+            DataStreamMetadata.dataStreamMetadata(now - 2000_000, now - 1000_000),
             DataStreamMetadata.dataStreamMetadata(now, null)
         );
 
@@ -1153,8 +1153,8 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         {
             // no retention configured but we have default retention
             DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(
-                TimeValue.timeValueMillis(2500),
-                randomBoolean() ? TimeValue.timeValueMillis(randomIntBetween(2500, 5000)) : null
+                TimeValue.timeValueSeconds(2500),
+                randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(2500, 5000)) : null
             );
             Metadata.Builder builder = Metadata.builder();
             DataStream dataStream = createDataStream(
@@ -1174,7 +1174,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
 
         {
             // no retention configured but we have max retention
-            DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueMillis(2500));
+            DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueSeconds(2500));
             Metadata.Builder builder = Metadata.builder();
             DataStream dataStream = createDataStream(
                 builder,
@@ -1198,7 +1198,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
                 dataStreamName,
                 creationAndRolloverTimes,
                 settings(IndexVersion.current()),
-                DataStreamLifecycle.newBuilder().dataRetention(2500).build()
+                DataStreamLifecycle.newBuilder().dataRetention(TimeValue.timeValueSeconds(2500)).build()
             );
             Metadata metadata = builder.build();
 
@@ -1237,7 +1237,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
                 dataStreamName,
                 creationAndRolloverTimes,
                 settings(IndexVersion.current()),
-                DataStreamLifecycle.newBuilder().dataRetention(6000).build()
+                DataStreamLifecycle.newBuilder().dataRetention(TimeValue.timeValueSeconds(6000)).build()
             );
             Metadata metadata = builder.build();
 

+ 11 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java

@@ -358,6 +358,14 @@ public class ClusterPrivilegeResolver {
             "cluster:admin/xpack/connector/secret/put"
         )
     );
+    public static final NamedClusterPrivilege MONITOR_GLOBAL_RETENTION = new ActionClusterPrivilege(
+        "monitor_data_stream_global_retention",
+        Set.of("cluster:monitor/data_stream/global_retention/*")
+    );
+    public static final NamedClusterPrivilege MANAGE_GLOBAL_RETENTION = new ActionClusterPrivilege(
+        "manage_data_stream_global_retention",
+        Set.of("cluster:admin/data_stream/global_retention/*", "cluster:monitor/data_stream/global_retention/*")
+    );
 
     private static final Map<String, NamedClusterPrivilege> VALUES = sortByAccessLevel(
         Stream.of(
@@ -417,7 +425,9 @@ public class ClusterPrivilegeResolver {
             CROSS_CLUSTER_SEARCH,
             CROSS_CLUSTER_REPLICATION,
             READ_CONNECTOR_SECRETS,
-            WRITE_CONNECTOR_SECRETS
+            WRITE_CONNECTOR_SECRETS,
+            MONITOR_GLOBAL_RETENTION,
+            MANAGE_GLOBAL_RETENTION
         ).filter(Objects::nonNull).toList()
     );
 

+ 3 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -22,6 +22,8 @@ public class Constants {
         "cluster:admin/component_template/delete",
         "cluster:admin/component_template/get",
         "cluster:admin/component_template/put",
+        "cluster:admin/data_stream/global_retention/delete",
+        "cluster:admin/data_stream/global_retention/put",
         "cluster:admin/deprecation/cache/reset",
         "cluster:admin/fleet/secrets/delete",
         "cluster:admin/fleet/secrets/get",
@@ -326,6 +328,7 @@ public class Constants {
         "cluster:monitor/ccr/follow_info",
         "cluster:monitor/ccr/follow_stats",
         "cluster:monitor/ccr/stats",
+        "cluster:monitor/data_stream/global_retention/get",
         "cluster:monitor/data_stream/lifecycle/stats",
         "cluster:monitor/eql/async/status",
         "cluster:monitor/fetch/health/info",

+ 1 - 1
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml

@@ -15,5 +15,5 @@ setup:
   # This is fragile - it needs to be updated every time we add a new cluster/index privilege
   # I would much prefer we could just check that specific entries are in the array, but we don't have
   # an assertion for that
-  - length: { "cluster" : 57 }
+  - length: { "cluster" : 59 }
   - length: { "index" : 22 }