فهرست منبع

Add new endpoints to configure data lifecycle on a data stream level. (#94590)

With PR we introduce CRUD endpoints which update/delete the data lifecycle on the data stream level. When this is updated it will apply at the next DLM run to all the backing indices that are managed by DLM.
Mary Gouseti 2 سال پیش
والد
کامیت
99145bbe9c
36فایلهای تغییر یافته به همراه2439 افزوده شده و 93 حذف شده
  1. 4 0
      docs/build.gradle
  2. 5 0
      docs/changelog/94590.yaml
  3. 84 0
      docs/reference/dlm/apis/delete-lifecycle.asciidoc
  4. 7 0
      docs/reference/dlm/apis/dlm-api.asciidoc
  5. 134 0
      docs/reference/dlm/apis/get-lifecycle.asciidoc
  6. 76 0
      docs/reference/dlm/apis/put-lifecycle.asciidoc
  7. 0 1
      docs/reference/rest-api/index.asciidoc
  8. 1 1
      modules/data-streams/src/main/java/module-info.java
  9. 1 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DataStreamsActionUtil.java
  10. 1 0
      modules/dlm/build.gradle
  11. 193 0
      modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/CrudDataLifecycleIT.java
  12. 270 0
      modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/CrudDataLifecycleSystemDataStreamIT.java
  13. 55 49
      modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/DataLifecycleServiceIT.java
  14. 2 3
      modules/dlm/src/main/java/module-info.java
  15. 15 0
      modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecyclePlugin.java
  16. 108 0
      modules/dlm/src/main/java/org/elasticsearch/dlm/action/DeleteDataLifecycleAction.java
  17. 245 0
      modules/dlm/src/main/java/org/elasticsearch/dlm/action/GetDataLifecycleAction.java
  18. 139 0
      modules/dlm/src/main/java/org/elasticsearch/dlm/action/PutDataLifecycleAction.java
  19. 84 0
      modules/dlm/src/main/java/org/elasticsearch/dlm/action/TransportDeleteDataLifecycleAction.java
  20. 101 0
      modules/dlm/src/main/java/org/elasticsearch/dlm/action/TransportGetDataLifecycleAction.java
  21. 90 0
      modules/dlm/src/main/java/org/elasticsearch/dlm/action/TransportPutDataLifecycleAction.java
  22. 49 0
      modules/dlm/src/main/java/org/elasticsearch/dlm/rest/RestDeleteDataLifecycleAction.java
  23. 55 0
      modules/dlm/src/main/java/org/elasticsearch/dlm/rest/RestGetDataLifecycleAction.java
  24. 55 0
      modules/dlm/src/main/java/org/elasticsearch/dlm/rest/RestPutDataLifecycleAction.java
  25. 99 0
      modules/dlm/src/test/java/org/elasticsearch/dlm/DLMFixtures.java
  26. 15 38
      modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleServiceTests.java
  27. 109 0
      modules/dlm/src/yamlRestTest/resources/rest-api-spec/test/dlm/20_basic.yml
  28. 76 0
      modules/dlm/src/yamlRestTest/resources/rest-api-spec/test/dlm/30_not_found.yml
  29. 57 0
      rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_lifecycle.json
  30. 53 0
      rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_lifecycle.json
  31. 61 0
      rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_data_lifecycle.json
  32. 59 0
      server/src/main/java/org/elasticsearch/cluster/AckedBatchedClusterStateUpdateTask.java
  33. 1 1
      server/src/main/java/org/elasticsearch/cluster/metadata/DataLifecycle.java
  34. 109 0
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java
  35. 23 0
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java
  36. 3 0
      x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

+ 4 - 0
docs/build.gradle

@@ -93,6 +93,7 @@ testClusters.matching { it.name == "yamlRestTest"}.configureEach {
   systemProperty 'es.transport.cname_in_publish_address', 'true'
 
   requiresFeature 'es.index_mode_feature_flag_registered', Version.fromString("8.0.0")
+  requiresFeature 'es.dlm_feature_flag_enabled', Version.fromString("8.8.0")
 
   // build the cluster with all plugins
   project.rootProject.subprojects.findAll { it.parent.path == ':plugins' }.each { subproj ->
@@ -114,6 +115,9 @@ tasks.named("yamlRestTest").configure {
   doFirst {
     delete("${buildDir}/cluster/shared/repo")
   }
+  if (BuildParams.isSnapshotBuild() == false) {
+    systemProperty 'es.dlm_feature_flag_enabled', 'true'
+  }
 }
 
 // TODO: Remove the following when RCS feature is released

+ 5 - 0
docs/changelog/94590.yaml

@@ -0,0 +1,5 @@
+pr: 94590
+summary: Add new endpoints to configure data lifecycle on a data stream level
+area: DLM
+type: feature
+issues: []

+ 84 - 0
docs/reference/dlm/apis/delete-lifecycle.asciidoc

@@ -0,0 +1,84 @@
+[[dlm-delete-lifecycle]]
+=== Delete the lifecycle of a data stream
+++++
+<titleabbrev>Delete Data Stream Lifecycle</titleabbrev>
+++++
+
+experimental::[]
+
+Deletes the lifecycle from a set of data streams.
+
+[[dlm-delete-lifecycle-request]]
+==== {api-request-title}
+
+`DELETE _data_stream/<data-stream>/_lifecycle`
+
+[[dlm-delete-lifecycle-desc]]
+==== {api-description-title}
+
+Deletes the lifecycle from the specified data streams. If multiple data streams are provided but at least one of them
+does not exist, then the deletion of the lifecycle will fail for all of them and the API will respond with `404`.
+
+[[dlm-delete-lifecycle-path-params]]
+==== {api-path-parms-title}
+
+`<data-stream>`::
+(Required, string) Comma-separated list of data streams used to limit the request. Supports wildcards (`*`).
+To target all data streams use `*` or `_all`.
+
+
+[role="child_attributes"]
+[[delete-data-lifecycle-api-query-parms]]
+==== {api-query-parms-title}
+
+include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=ds-expand-wildcards]
++
+Defaults to `open`.
+
+[[dlm-delete-lifecycle-example]]
+==== {api-examples-title}
+
+////
+
+[source,console]
+--------------------------------------------------
+PUT /_index_template/my-template
+{
+  "index_patterns" : ["my-data-stream*"],
+  "priority" : 1,
+  "data_stream": {},
+  "template": {
+    "lifecycle" : {
+      "data_retention" : "7d"
+    }
+  }
+}
+
+PUT /_data_stream/my-data-stream
+--------------------------------------------------
+// TESTSETUP
+
+[source,console]
+--------------------------------------------------
+DELETE _data_stream/my-data-stream
+DELETE _index_template/my-template
+--------------------------------------------------
+// TEARDOWN
+
+////
+
+The following example deletes the lifecycle of `my-data-stream`:
+
+[source,console]
+--------------------------------------------------
+DELETE _data_stream/my-data-stream/_lifecycle
+--------------------------------------------------
+
+When the policy is successfully deleted from all selected data streams, you receive the following result:
+
+[source,console-result]
+--------------------------------------------------
+{
+  "acknowledged": true
+}
+--------------------------------------------------

+ 7 - 0
docs/reference/dlm/apis/dlm-api.asciidoc

@@ -8,6 +8,13 @@ and to retrieve lifecycle information for backing indices.
 [[dlm-api-management-endpoint]]
 === Operation management APIs
 
+* <<dlm-put-lifecycle,Update data stream lifecycle>>
+* <<dlm-get-lifecycle,Get data stream lifecycle>>
+* <<dlm-delete-lifecycle,Delete data stream lifecycle>>
 * <<dlm-explain-lifecycle,Explain Lifecycle API>>
 
+include::put-lifecycle.asciidoc[]
+include::get-lifecycle.asciidoc[]
+include::delete-lifecycle.asciidoc[]
 include::explain-data-lifecycle.asciidoc[]
+

+ 134 - 0
docs/reference/dlm/apis/get-lifecycle.asciidoc

@@ -0,0 +1,134 @@
+[[dlm-get-lifecycle]]
+=== Get the lifecycle of a data stream
+++++
+<titleabbrev>Get Data Stream Lifecycle</titleabbrev>
+++++
+
+experimental::[]
+
+Gets the lifecycle of a set of data streams.
+
+[[dlm-get-lifecycle-request]]
+==== {api-request-title}
+
+`GET _data_stream/<data-stream>/_lifecycle`
+
+[[dlm-get-lifecycle-desc]]
+==== {api-description-title}
+
+Gets the lifecycle of the specified data streams. If multiple data streams are requested but at least one of them
+does not exist, then the API will respond with `404` since at least one of the requested resources could not be retrieved.
+
+[[dlm-get-lifecycle-path-params]]
+==== {api-path-parms-title}
+
+`<data-stream>`::
+(Required, string) Comma-separated list of data streams used to limit the request. Supports wildcards (`*`).
+To target all data streams use `*` or `_all`.
+
+[role="child_attributes"]
+[[get-data-lifecycle-api-query-parms]]
+==== {api-query-parms-title}
+
+include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=ds-expand-wildcards]
++
+Defaults to `open`.
+
+`include_defaults`::
+(Optional, Boolean) If `true`, return all default settings in the response.
+Defaults to `false`.
+
+[role="child_attributes"]
+[[get-lifecycle-api-response-body]]
+==== {api-response-body-title}
+
+`data_streams`::
+(array of objects)
+Contains information about retrieved data stream lifecycles.
++
+.Properties of objects in `data_streams`
+[%collapsible%open]
+====
+`name`::
+(string)
+Name of the data stream.
+`lifecycle`::
+(object)
++
+.Properties of `lifecycle`
+[%collapsible%open]
+=====
+`data_retention`::
+(string)
+If defined, every document added to this data stream will be stored at least for this time frame. Any time after this
+duration the document could be deleted. When undefined, every document in this data stream will be stored indefinitely.
+
+`rollover`::
+(object)
+The conditions which will trigger the rollover of a backing index as configured by the cluster setting
+`cluster.dlm.default.rollover`. This property is an implementation detail and it will only be retrieved when the query
+param `include_defaults` is set to `true`. The contents of this field are subject to change.
+=====
+====
+
+[[dlm-get-lifecycle-example]]
+==== {api-examples-title}
+
+////
+
+[source,console]
+--------------------------------------------------
+PUT /_index_template/my-template
+{
+  "index_patterns" : ["my-data-stream*"],
+  "priority" : 1,
+  "data_stream": {},
+  "template": {
+    "lifecycle" : {
+      "data_retention" : "7d"
+    }
+  }
+}
+
+PUT /_data_stream/my-data-stream-1
+PUT /_data_stream/my-data-stream-2
+--------------------------------------------------
+// TESTSETUP
+
+[source,console]
+--------------------------------------------------
+DELETE _data_stream/my-data-stream*
+DELETE _index_template/my-template
+--------------------------------------------------
+// TEARDOWN
+
+////
+
+Let's retrieve the lifecycles:
+
+[source,console]
+--------------------------------------------------
+GET _data_stream/my-data-stream*/_lifecycle
+--------------------------------------------------
+
+The response will look like the following:
+
+[source,console-result]
+--------------------------------------------------
+{
+  "data_streams": [
+    {
+      "name": "my-data-stream-1",
+      "lifecycle": {
+        "data_retention": "7d"
+      }
+    },
+    {
+      "name": "my-data-stream-2",
+      "lifecycle": {
+        "data_retention": "7d"
+      }
+    }
+  ]
+}
+--------------------------------------------------

+ 76 - 0
docs/reference/dlm/apis/put-lifecycle.asciidoc

@@ -0,0 +1,76 @@
+[[dlm-put-lifecycle]]
+=== Set the lifecycle of a data stream
+++++
+<titleabbrev>Put Data Stream Lifecycle</titleabbrev>
+++++
+
+experimental::[]
+
+Configures the data lifecycle for the targeted data streams.
+
+[[dlm-put-lifecycle-request]]
+==== {api-request-title}
+
+`PUT _data_stream/<data-stream>/_lifecycle`
+
+[[dlm-put-lifecycle-desc]]
+==== {api-description-title}
+
+Configures the data lifecycle for the targeted data streams. If multiple data streams are provided but at least one of them
+does not exist, then the update of the lifecycle will fail for all of them and the API will respond with `404`.
+
+[[dlm-put-lifecycle-path-params]]
+==== {api-path-parms-title}
+
+`<data-stream>`::
+(Required, string) Comma-separated list of data streams used to limit the request. Supports wildcards (`*`).
+To target all data streams use `*` or `_all`.
+
+[role="child_attributes"]
+[[put-data-lifecycle-api-query-parms]]
+==== {api-query-parms-title}
+
+include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=ds-expand-wildcards]
++
+Defaults to `open`.
+
+[[put-lifecycle-api-request-body]]
+==== {api-request-body-title}
+
+`lifecycle`::
+(Required, object)
++
+.Properties of `lifecycle`
+[%collapsible%open]
+====
+`data_retention`::
+(Optional, string)
+If defined, every document added to this data stream will be stored at least for this time frame. Any time after this
+duration the document could be deleted. When empty, every document in this data stream will be stored indefinitely.
+====
+
+[[dlm-put-lifecycle-example]]
+==== {api-examples-title}
+
+The following example sets the lifecycle of `my-data-stream`:
+
+[source,console]
+--------------------------------------------------
+PUT _data_stream/my-data-stream/_lifecycle
+{
+  "lifecycle": {
+    "data_retention": "7d"
+  }
+}
+--------------------------------------------------
+// TEST[setup:my_data_stream]
+// TEST[teardown:data_stream_cleanup]
+
+When the lifecycle is successfully updated in all data streams, you receive the following result:
+
+[source,console-result]
+--------------------------------------------------
+{
+  "acknowledged": true
+}
+--------------------------------------------------

+ 0 - 1
docs/reference/rest-api/index.asciidoc

@@ -99,4 +99,3 @@ include::usage.asciidoc[]
 include::{xes-repo-dir}/rest-api/watcher.asciidoc[]
 include::defs.asciidoc[]
 include::{es-repo-dir}/dlm/apis/dlm-api.asciidoc[]
-

+ 1 - 1
modules/data-streams/src/main/java/module-info.java

@@ -13,6 +13,6 @@ module org.elasticsearch.datastreams {
     requires org.apache.logging.log4j;
     requires org.apache.lucene.core;
 
-    exports org.elasticsearch.datastreams.action to org.elasticsearch.server;
+    exports org.elasticsearch.datastreams.action to org.elasticsearch.server, org.elasticsearch.dlm;
     exports org.elasticsearch.datastreams to org.elasticsearch.dlm;
 }

+ 1 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DataStreamsActionUtil.java

@@ -5,6 +5,7 @@
  * in compliance with, at your election, the Elastic License 2.0 or the Server
  * Side Public License, v 1.
  */
+
 package org.elasticsearch.datastreams.action;
 
 import org.elasticsearch.action.support.IndicesOptions;

+ 1 - 0
modules/dlm/build.gradle

@@ -19,6 +19,7 @@ esplugin {
 }
 archivesBaseName = 'dlm'
 dependencies {
+  implementation project(path: ':modules:data-streams')
   testImplementation project(':modules:data-streams')
 }
 

+ 193 - 0
modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/CrudDataLifecycleIT.java

@@ -0,0 +1,193 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.dlm;
+
+import org.elasticsearch.action.datastreams.CreateDataStreamAction;
+import org.elasticsearch.cluster.metadata.DataLifecycle;
+import org.elasticsearch.datastreams.DataStreamsPlugin;
+import org.elasticsearch.dlm.action.DeleteDataLifecycleAction;
+import org.elasticsearch.dlm.action.GetDataLifecycleAction;
+import org.elasticsearch.dlm.action.PutDataLifecycleAction;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.elasticsearch.dlm.DLMFixtures.putComposableIndexTemplate;
+import static org.elasticsearch.dlm.DLMFixtures.randomDataLifecycle;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class CrudDataLifecycleIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return List.of(DataLifecyclePlugin.class, DataStreamsPlugin.class, MockTransportService.TestPlugin.class);
+    }
+
+    protected boolean ignoreExternalCluster() {
+        return true;
+    }
+
+    public void testGetLifecycle() throws Exception {
+        DataLifecycle lifecycle = randomDataLifecycle();
+        putComposableIndexTemplate("id1", null, List.of("with-lifecycle*"), null, null, lifecycle);
+        putComposableIndexTemplate("id2", null, List.of("without-lifecycle*"), null, null, null);
+        {
+            String dataStreamName = "with-lifecycle-1";
+            CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+            client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+        }
+        {
+            String dataStreamName = "with-lifecycle-2";
+            CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+            client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+        }
+        {
+            String dataStreamName = "without-lifecycle";
+            CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+            client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+        }
+
+        // Test retrieving all lifecycles
+        {
+            GetDataLifecycleAction.Request getDataLifecycleRequest = new GetDataLifecycleAction.Request(new String[] { "*" });
+            GetDataLifecycleAction.Response response = client().execute(GetDataLifecycleAction.INSTANCE, getDataLifecycleRequest).get();
+            assertThat(response.getDataStreamLifecycles().size(), equalTo(2));
+            assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("with-lifecycle-1"));
+            assertThat(response.getDataStreamLifecycles().get(0).lifecycle(), equalTo(lifecycle));
+            assertThat(response.getDataStreamLifecycles().get(1).dataStreamName(), equalTo("with-lifecycle-2"));
+            assertThat(response.getDataStreamLifecycles().get(1).lifecycle(), equalTo(lifecycle));
+            assertThat(response.getRolloverConditions(), nullValue());
+        }
+
+        // Test retrieving all lifecycles prefixed wildcard
+        {
+            GetDataLifecycleAction.Request getDataLifecycleRequest = new GetDataLifecycleAction.Request(new String[] { "with-lifecycle*" });
+            GetDataLifecycleAction.Response response = client().execute(GetDataLifecycleAction.INSTANCE, getDataLifecycleRequest).get();
+            assertThat(response.getDataStreamLifecycles().size(), equalTo(2));
+            assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("with-lifecycle-1"));
+            assertThat(response.getDataStreamLifecycles().get(0).lifecycle(), equalTo(lifecycle));
+            assertThat(response.getDataStreamLifecycles().get(1).dataStreamName(), equalTo("with-lifecycle-2"));
+            assertThat(response.getDataStreamLifecycles().get(1).lifecycle(), equalTo(lifecycle));
+            assertThat(response.getRolloverConditions(), nullValue());
+        }
+
+        // Test retrieving concrete data streams
+        {
+            GetDataLifecycleAction.Request getDataLifecycleRequest = new GetDataLifecycleAction.Request(
+                new String[] { "with-lifecycle-1", "with-lifecycle-2" }
+            );
+            GetDataLifecycleAction.Response response = client().execute(GetDataLifecycleAction.INSTANCE, getDataLifecycleRequest).get();
+            assertThat(response.getDataStreamLifecycles().size(), equalTo(2));
+            assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("with-lifecycle-1"));
+            assertThat(response.getDataStreamLifecycles().get(0).lifecycle(), equalTo(lifecycle));
+            assertThat(response.getRolloverConditions(), nullValue());
+        }
+
+        // Test include defaults
+        GetDataLifecycleAction.Request getDataLifecycleRequestWithDefaults = new GetDataLifecycleAction.Request(new String[] { "*" })
+            .includeDefaults(true);
+        GetDataLifecycleAction.Response responseWithRollover = client().execute(
+            GetDataLifecycleAction.INSTANCE,
+            getDataLifecycleRequestWithDefaults
+        ).get();
+        assertThat(responseWithRollover.getDataStreamLifecycles().size(), equalTo(2));
+        assertThat(responseWithRollover.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("with-lifecycle-1"));
+        assertThat(responseWithRollover.getDataStreamLifecycles().get(0).lifecycle(), equalTo(lifecycle));
+        assertThat(responseWithRollover.getDataStreamLifecycles().get(1).dataStreamName(), equalTo("with-lifecycle-2"));
+        assertThat(responseWithRollover.getDataStreamLifecycles().get(1).lifecycle(), equalTo(lifecycle));
+        assertThat(responseWithRollover.getRolloverConditions(), notNullValue());
+    }
+
+    public void testPutLifecycle() throws Exception {
+        putComposableIndexTemplate("id1", null, List.of("my-data-stream*"), null, null, null);
+        // Create index without a lifecycle
+        String dataStreamName = "my-data-stream";
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+        client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+
+        {
+            GetDataLifecycleAction.Request getDataLifecycleRequest = new GetDataLifecycleAction.Request(new String[] { "my-data-stream" });
+            GetDataLifecycleAction.Response response = client().execute(GetDataLifecycleAction.INSTANCE, getDataLifecycleRequest).get();
+            assertThat(response.getDataStreamLifecycles().isEmpty(), equalTo(true));
+        }
+
+        // Set lifecycle
+        {
+            DataLifecycle lifecycle = randomDataLifecycle();
+            PutDataLifecycleAction.Request putDataLifecycleRequest = new PutDataLifecycleAction.Request(new String[] { "*" }, lifecycle);
+            assertThat(client().execute(PutDataLifecycleAction.INSTANCE, putDataLifecycleRequest).get().isAcknowledged(), equalTo(true));
+            GetDataLifecycleAction.Request getDataLifecycleRequest = new GetDataLifecycleAction.Request(new String[] { "my-data-stream" });
+            GetDataLifecycleAction.Response response = client().execute(GetDataLifecycleAction.INSTANCE, getDataLifecycleRequest).get();
+            assertThat(response.getDataStreamLifecycles().size(), equalTo(1));
+            assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("my-data-stream"));
+            assertThat(response.getDataStreamLifecycles().get(0).lifecycle(), equalTo(lifecycle));
+        }
+    }
+
+    public void testDeleteLifecycle() throws Exception {
+        DataLifecycle lifecycle = new DataLifecycle(randomMillisUpToYear9999());
+        putComposableIndexTemplate("id1", null, List.of("with-lifecycle*"), null, null, lifecycle);
+        putComposableIndexTemplate("id2", null, List.of("without-lifecycle*"), null, null, null);
+        {
+            String dataStreamName = "with-lifecycle-1";
+            CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+            client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+        }
+        {
+            String dataStreamName = "with-lifecycle-2";
+            CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+            client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+        }
+        {
+            String dataStreamName = "with-lifecycle-3";
+            CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+            client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+        }
+
+        // Verify that we have 3 data streams with lifecycles
+        {
+            GetDataLifecycleAction.Request getDataLifecycleRequest = new GetDataLifecycleAction.Request(new String[] { "with-lifecycle*" });
+            GetDataLifecycleAction.Response response = client().execute(GetDataLifecycleAction.INSTANCE, getDataLifecycleRequest).get();
+            assertThat(response.getDataStreamLifecycles().size(), equalTo(3));
+        }
+
+        // Remove lifecycle from concrete data stream
+        {
+            DeleteDataLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataLifecycleAction.Request(
+                new String[] { "with-lifecycle-1" }
+            );
+            assertThat(
+                client().execute(DeleteDataLifecycleAction.INSTANCE, deleteDataLifecycleRequest).get().isAcknowledged(),
+                equalTo(true)
+            );
+            GetDataLifecycleAction.Request getDataLifecycleRequest = new GetDataLifecycleAction.Request(new String[] { "with-lifecycle*" });
+            GetDataLifecycleAction.Response response = client().execute(GetDataLifecycleAction.INSTANCE, getDataLifecycleRequest).get();
+            assertThat(response.getDataStreamLifecycles().size(), equalTo(2));
+            assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("with-lifecycle-2"));
+            assertThat(response.getDataStreamLifecycles().get(1).dataStreamName(), equalTo("with-lifecycle-3"));
+        }
+
+        // Remove lifecycle from all data streams
+        {
+            DeleteDataLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataLifecycleAction.Request(new String[] { "*" });
+            assertThat(
+                client().execute(DeleteDataLifecycleAction.INSTANCE, deleteDataLifecycleRequest).get().isAcknowledged(),
+                equalTo(true)
+            );
+            GetDataLifecycleAction.Request getDataLifecycleRequest = new GetDataLifecycleAction.Request(new String[] { "with-lifecycle*" });
+            GetDataLifecycleAction.Response response = client().execute(GetDataLifecycleAction.INSTANCE, getDataLifecycleRequest).get();
+            assertThat(response.getDataStreamLifecycles().isEmpty(), equalTo(true));
+        }
+    }
+}

+ 270 - 0
modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/CrudDataLifecycleSystemDataStreamIT.java

@@ -0,0 +1,270 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.dlm;
+
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse.ResetFeatureStateStatus;
+import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.IndicesOptions.Option;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemplate;
+import org.elasticsearch.cluster.metadata.DataLifecycle;
+import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.datastreams.DataStreamsPlugin;
+import org.elasticsearch.indices.ExecutorNames;
+import org.elasticsearch.indices.SystemDataStreamDescriptor;
+import org.elasticsearch.indices.SystemDataStreamDescriptor.Type;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.SystemIndexPlugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.transport.netty4.Netty4Plugin;
+import org.elasticsearch.xcontent.XContentType;
+import org.junit.After;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class CrudDataLifecycleSystemDataStreamIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
+        plugins.add(DataStreamsPlugin.class);
+        plugins.add(DataLifecyclePlugin.class);
+        plugins.add(TestSystemDataStreamPlugin.class);
+        return plugins;
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal, otherSettings))
+            .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
+            .build();
+    }
+
+    @Override
+    protected boolean addMockHttpTransport() {
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testDataLifecycleOnSystemDataStream() throws Exception {
+        String systemDataStream = ".test-data-stream";
+        RequestOptions correctProductHeader = RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "product").build();
+        RequestOptions wrongProductHeader = RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build();
+        try (RestClient restClient = createRestClient()) {
+            // Set-up system data stream
+            {
+                Request putRequest = new Request("PUT", "/_data_stream/" + systemDataStream);
+                putRequest.setOptions(correctProductHeader);
+                Response putResponse = restClient.performRequest(putRequest);
+                assertThat(putResponse.getStatusLine().getStatusCode(), is(200));
+            }
+
+            // Data lifecycle of hidden data streams is not retrieved by default
+            {
+                Request listAllVisibleRequest = new Request("GET", "/_data_stream/*/_lifecycle");
+                Response listAllVisibleResponse = restClient.performRequest(listAllVisibleRequest);
+                assertThat(listAllVisibleResponse.getStatusLine().getStatusCode(), is(200));
+                Map<String, Object> visibleResponseMap = XContentHelper.convertToMap(
+                    XContentType.JSON.xContent(),
+                    EntityUtils.toString(listAllVisibleResponse.getEntity()),
+                    false
+                );
+                List<Object> visibleDataStreams = (List<Object>) visibleResponseMap.get("data_streams");
+                assertThat(visibleDataStreams.size(), is(0));
+            }
+
+            // Data lifecycle of hidden data streams is retrieved when enabled - no header needed
+            {
+                Request listAllRequest = new Request("GET", "/_data_stream/*/_lifecycle");
+                listAllRequest.addParameter("expand_wildcards", "open,hidden");
+                Response listAllResponse = restClient.performRequest(listAllRequest);
+                assertThat(listAllResponse.getStatusLine().getStatusCode(), is(200));
+                Map<String, Object> responseMap = XContentHelper.convertToMap(
+                    XContentType.JSON.xContent(),
+                    EntityUtils.toString(listAllResponse.getEntity()),
+                    false
+                );
+                List<Object> dataStreams = (List<Object>) responseMap.get("data_streams");
+                assertThat(dataStreams.size(), is(1));
+                Map<String, Object> dataStreamLifecycle = (Map<String, Object>) dataStreams.get(0);
+                assertThat(dataStreamLifecycle.get("name"), equalTo(systemDataStream));
+            }
+
+            // Retrieve using the concrete data stream name - header needed
+            {
+                Request listRequest = new Request("GET", "/_data_stream/" + systemDataStream + "/_lifecycle");
+                Response listResponse = restClient.performRequest(listRequest);
+                assertThat(listResponse.getStatusLine().getStatusCode(), is(200));
+                Map<String, Object> responseMap = XContentHelper.convertToMap(
+                    XContentType.JSON.xContent(),
+                    EntityUtils.toString(listResponse.getEntity()),
+                    false
+                );
+                List<Object> dataStreams = (List<Object>) responseMap.get("data_streams");
+                assertThat(dataStreams.size(), is(1));
+            }
+
+            // Update the lifecycle
+            {
+                Request putRequest = new Request("PUT", "/_data_stream/" + systemDataStream + "/_lifecycle");
+                putRequest.setJsonEntity("""
+                    {
+                      "lifecycle": {}
+                    }""");
+                // No header
+                ResponseException re = expectThrows(ResponseException.class, () -> restClient.performRequest(putRequest));
+                assertThat(re.getMessage(), containsString("reserved for system"));
+
+                // wrong header
+                putRequest.setOptions(wrongProductHeader);
+                re = expectThrows(ResponseException.class, () -> restClient.performRequest(putRequest));
+                assertThat(re.getMessage(), containsString("may not be accessed by product [wrong]"));
+
+                // correct
+                putRequest.setOptions(correctProductHeader);
+                Response putResponse = restClient.performRequest(putRequest);
+                assertThat(putResponse.getStatusLine().getStatusCode(), is(200));
+            }
+
+            // delete
+            {
+                Request deleteRequest = new Request("DELETE", "/_data_stream/" + systemDataStream + "/_lifecycle");
+                ResponseException re = expectThrows(ResponseException.class, () -> restClient.performRequest(deleteRequest));
+                assertThat(re.getMessage(), containsString("reserved for system"));
+
+                // wrong header
+                deleteRequest.setOptions(wrongProductHeader);
+                re = expectThrows(ResponseException.class, () -> restClient.performRequest(deleteRequest));
+                assertThat(re.getMessage(), containsString("may not be accessed by product [wrong]"));
+
+                // correct
+                deleteRequest.setOptions(correctProductHeader);
+                Response deleteResponse = restClient.performRequest(deleteRequest);
+                assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200));
+            }
+        }
+    }
+
+    @After
+    public void cleanup() {
+        try {
+            PlainActionFuture<ResetFeatureStateStatus> stateStatusPlainActionFuture = new PlainActionFuture<>();
+            new TestSystemDataStreamPlugin().cleanUpFeature(
+                internalCluster().clusterService(),
+                internalCluster().client(),
+                stateStatusPlainActionFuture
+            );
+            stateStatusPlainActionFuture.actionGet();
+        } catch (ResourceNotFoundException e) {
+            // ignore
+        }
+    }
+
+    public static final class TestSystemDataStreamPlugin extends Plugin implements SystemIndexPlugin {
+
+        @Override
+        public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
+            try {
+                CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}");
+                return List.of(
+                    new SystemDataStreamDescriptor(
+                        ".test-data-stream",
+                        "system data stream test",
+                        Type.EXTERNAL,
+                        new ComposableIndexTemplate(
+                            List.of(".test-data-stream"),
+                            new Template(Settings.EMPTY, mappings, null, new DataLifecycle(randomMillisUpToYear9999())),
+                            null,
+                            null,
+                            null,
+                            null,
+                            new DataStreamTemplate()
+                        ),
+                        Map.of(),
+                        List.of("product"),
+                        ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
+                    )
+                );
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+
+        @Override
+        public String getFeatureName() {
+            return CrudDataLifecycleSystemDataStreamIT.class.getSimpleName();
+        }
+
+        @Override
+        public String getFeatureDescription() {
+            return "Integration testing of modifying the data lifecycle of system data streams";
+        }
+
+        @Override
+        public void cleanUpFeature(ClusterService clusterService, Client client, ActionListener<ResetFeatureStateStatus> listener) {
+            Collection<SystemDataStreamDescriptor> dataStreamDescriptors = getSystemDataStreamDescriptors();
+            final DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(
+                dataStreamDescriptors.stream().map(SystemDataStreamDescriptor::getDataStreamName).toList().toArray(Strings.EMPTY_ARRAY)
+            );
+            EnumSet<Option> options = request.indicesOptions().options();
+            options.add(Option.IGNORE_UNAVAILABLE);
+            options.add(Option.ALLOW_NO_INDICES);
+            request.indicesOptions(new IndicesOptions(options, request.indicesOptions().expandWildcards()));
+            try {
+                client.execute(
+                    DeleteDataStreamAction.INSTANCE,
+                    request,
+                    ActionListener.wrap(response -> SystemIndexPlugin.super.cleanUpFeature(clusterService, client, listener), e -> {
+                        Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
+                        if (unwrapped instanceof ResourceNotFoundException) {
+                            SystemIndexPlugin.super.cleanUpFeature(clusterService, client, listener);
+                        } else {
+                            listener.onFailure(e);
+                        }
+                    })
+                );
+            } catch (Exception e) {
+                Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
+                if (unwrapped instanceof ResourceNotFoundException) {
+                    SystemIndexPlugin.super.cleanUpFeature(clusterService, client, listener);
+                } else {
+                    listener.onFailure(e);
+                }
+            }
+        }
+    }
+}

+ 55 - 49
modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/DataLifecycleServiceIT.java

@@ -16,20 +16,17 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.datastreams.CreateDataStreamAction;
 import org.elasticsearch.action.datastreams.GetDataStreamAction;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.support.PlainActionFuture;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.DataLifecycle;
 import org.elasticsearch.cluster.metadata.DataStream;
-import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.Template;
-import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.datastreams.DataStreamsPlugin;
+import org.elasticsearch.dlm.action.PutDataLifecycleAction;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.plugins.Plugin;
@@ -133,6 +130,49 @@ public class DataLifecycleServiceIT extends ESIntegTestCase {
         });
     }
 
+    public void testUpdatingLifecycleAppliesToAllBackingIndices() throws Exception {
+        DataLifecycle lifecycle = new DataLifecycle();
+
+        putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);
+
+        String dataStreamName = "metrics-foo";
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+        client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+
+        int finalGeneration = randomIntBetween(2, 20);
+        for (int currentGeneration = 1; currentGeneration < finalGeneration; currentGeneration++) {
+            indexDocs(dataStreamName, 1);
+            int currentBackingIndexCount = currentGeneration;
+            assertBusy(() -> {
+                GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
+                GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
+                    .actionGet();
+                assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
+                DataStream dataStream = getDataStreamResponse.getDataStreams().get(0).getDataStream();
+                assertThat(dataStream.getName(), equalTo(dataStreamName));
+                List<Index> backingIndices = dataStream.getIndices();
+                assertThat(backingIndices.size(), equalTo(currentBackingIndexCount + 1));
+                String writeIndex = dataStream.getWriteIndex().getName();
+                assertThat(writeIndex, backingIndexEqualTo(dataStreamName, currentBackingIndexCount + 1));
+            });
+        }
+        // Update the lifecycle of the data stream
+        updateLifecycle(dataStreamName, new DataLifecycle(TimeValue.timeValueMillis(1)));
+        // Verify that the retention has changed for all backing indices
+        assertBusy(() -> {
+            GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
+            GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
+                .actionGet();
+            assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
+            DataStream dataStream = getDataStreamResponse.getDataStreams().get(0).getDataStream();
+            assertThat(dataStream.getName(), equalTo(dataStreamName));
+            List<Index> backingIndices = dataStream.getIndices();
+            assertThat(backingIndices.size(), equalTo(1));
+            String writeIndex = dataStream.getWriteIndex().getName();
+            assertThat(writeIndex, backingIndexEqualTo(dataStreamName, finalGeneration));
+        });
+    }
+
     public void testErrorRecordingOnRollover() throws Exception {
         // empty lifecycle contains the default rollover
         DataLifecycle lifecycle = new DataLifecycle();
@@ -241,50 +281,7 @@ public class DataLifecycleServiceIT extends ESIntegTestCase {
         // mark the first generation index as read-only so deletion fails when we enable the retention configuration
         updateIndexSettings(Settings.builder().put(READ_ONLY.settingName(), true), firstGenerationIndex);
         try {
-            // TODO replace this with an API call to update the lifecycle for the data stream once available
-            PlainActionFuture.get(
-                fut -> internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
-                    .submitUnbatchedStateUpdateTask("update the data stream retention", new ClusterStateUpdateTask() {
-
-                        @Override
-                        public ClusterState execute(ClusterState state) {
-                            DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
-                            assert dataStream != null : "data stream must exist";
-                            Metadata.Builder builder = Metadata.builder(state.metadata());
-                            DataStream updatedDataStream = new DataStream(
-                                dataStreamName,
-                                dataStream.getIndices(),
-                                dataStream.getGeneration(),
-                                dataStream.getMetadata(),
-                                dataStream.isHidden(),
-                                dataStream.isReplicated(),
-                                dataStream.isSystem(),
-                                dataStream.isAllowCustomRouting(),
-                                dataStream.getIndexMode(),
-                                new DataLifecycle(TimeValue.timeValueSeconds(1))
-                            );
-                            builder.put(updatedDataStream);
-                            return ClusterState.builder(state).metadata(builder).build();
-                        }
-
-                        @Override
-                        public void onFailure(Exception e) {
-                            logger.error(e.getMessage(), e);
-                            fail(
-                                "unable to update the retention policy for data stream ["
-                                    + dataStreamName
-                                    + "] due to ["
-                                    + e.getMessage()
-                                    + "]"
-                            );
-                        }
-
-                        @Override
-                        public void clusterStateProcessed(ClusterState initialState, ClusterState newState) {
-                            fut.onResponse(null);
-                        }
-                    })
-            );
+            updateLifecycle(dataStreamName, new DataLifecycle(TimeValue.timeValueSeconds(1)));
 
             assertBusy(() -> {
                 GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
@@ -385,4 +382,13 @@ public class DataLifecycleServiceIT extends ESIntegTestCase {
         client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
     }
 
+    static void updateLifecycle(String dataStreamName, DataLifecycle dataLifecycle) {
+        PutDataLifecycleAction.Request putDataLifecycleRequest = new PutDataLifecycleAction.Request(
+            new String[] { dataStreamName },
+            dataLifecycle
+        );
+        AcknowledgedResponse putDataLifecycleResponse = client().execute(PutDataLifecycleAction.INSTANCE, putDataLifecycleRequest)
+            .actionGet();
+        assertThat(putDataLifecycleResponse.isAcknowledged(), equalTo(true));
+    }
 }

+ 2 - 3
modules/dlm/src/main/java/module-info.java

@@ -12,9 +12,8 @@ module org.elasticsearch.dlm {
     requires org.elasticsearch.xcontent;
     requires org.apache.lucene.core;
     requires org.apache.logging.log4j;
+    requires org.elasticsearch.datastreams;
 
+    exports org.elasticsearch.dlm.action to org.elasticsearch.server;
     exports org.elasticsearch.dlm;
-    exports org.elasticsearch.dlm.action;
-    exports org.elasticsearch.dlm.rest;
-
 }

+ 15 - 0
modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecyclePlugin.java

@@ -26,9 +26,18 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.core.IOUtils;
+import org.elasticsearch.dlm.action.DeleteDataLifecycleAction;
 import org.elasticsearch.dlm.action.ExplainDataLifecycleAction;
+import org.elasticsearch.dlm.action.GetDataLifecycleAction;
+import org.elasticsearch.dlm.action.PutDataLifecycleAction;
+import org.elasticsearch.dlm.action.TransportDeleteDataLifecycleAction;
 import org.elasticsearch.dlm.action.TransportExplainDataLifecycleAction;
+import org.elasticsearch.dlm.action.TransportGetDataLifecycleAction;
+import org.elasticsearch.dlm.action.TransportPutDataLifecycleAction;
+import org.elasticsearch.dlm.rest.RestDeleteDataLifecycleAction;
 import org.elasticsearch.dlm.rest.RestExplainDataLifecycleAction;
+import org.elasticsearch.dlm.rest.RestGetDataLifecycleAction;
+import org.elasticsearch.dlm.rest.RestPutDataLifecycleAction;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.plugins.ActionPlugin;
@@ -134,6 +143,9 @@ public class DataLifecyclePlugin extends Plugin implements ActionPlugin {
         }
 
         List<RestHandler> handlers = new ArrayList<>();
+        handlers.add(new RestPutDataLifecycleAction());
+        handlers.add(new RestGetDataLifecycleAction());
+        handlers.add(new RestDeleteDataLifecycleAction());
         handlers.add(new RestExplainDataLifecycleAction());
         return handlers;
     }
@@ -145,6 +157,9 @@ public class DataLifecyclePlugin extends Plugin implements ActionPlugin {
         }
 
         List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
+        actions.add(new ActionHandler<>(PutDataLifecycleAction.INSTANCE, TransportPutDataLifecycleAction.class));
+        actions.add(new ActionHandler<>(GetDataLifecycleAction.INSTANCE, TransportGetDataLifecycleAction.class));
+        actions.add(new ActionHandler<>(DeleteDataLifecycleAction.INSTANCE, TransportDeleteDataLifecycleAction.class));
         actions.add(new ActionHandler<>(ExplainDataLifecycleAction.INSTANCE, TransportExplainDataLifecycleAction.class));
         return actions;
     }

+ 108 - 0
modules/dlm/src/main/java/org/elasticsearch/dlm/action/DeleteDataLifecycleAction.java

@@ -0,0 +1,108 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.dlm.action;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Removes the data lifecycle configuration from the requested data streams.
+ */
+public class DeleteDataLifecycleAction extends ActionType<AcknowledgedResponse> {
+
+    public static final DeleteDataLifecycleAction INSTANCE = new DeleteDataLifecycleAction();
+    public static final String NAME = "indices:admin/data_lifecycle/delete";
+
+    private DeleteDataLifecycleAction() {
+        super(NAME, AcknowledgedResponse::readFrom);
+    }
+
+    public static final class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
+
+        private String[] names;
+        private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, true, true, true, false, false, true, false);
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            this.names = in.readOptionalStringArray();
+            this.indicesOptions = IndicesOptions.readIndicesOptions(in);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeOptionalStringArray(names);
+            indicesOptions.writeIndicesOptions(out);
+        }
+
+        public Request(String[] names) {
+            this.names = names;
+        }
+
+        public String[] getNames() {
+            return names;
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public String[] indices() {
+            return names;
+        }
+
+        @Override
+        public IndicesOptions indicesOptions() {
+            return indicesOptions;
+        }
+
+        public Request indicesOptions(IndicesOptions indicesOptions) {
+            this.indicesOptions = indicesOptions;
+            return this;
+        }
+
+        @Override
+        public boolean includeDataStreams() {
+            return true;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Request request = (Request) o;
+            return Arrays.equals(names, request.names) && Objects.equals(indicesOptions, request.indicesOptions);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(indicesOptions);
+            result = 31 * result + Arrays.hashCode(names);
+            return result;
+        }
+
+        @Override
+        public IndicesRequest indices(String... indices) {
+            this.names = indices;
+            return this;
+        }
+    }
+}

+ 245 - 0
modules/dlm/src/main/java/org/elasticsearch/dlm/action/GetDataLifecycleAction.java

@@ -0,0 +1,245 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.dlm.action;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.MasterNodeReadRequest;
+import org.elasticsearch.cluster.metadata.DataLifecycle;
+import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.ToXContentObject;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This action retrieves the data lifecycle from every data stream that has a data lifecycle configured.
+ */
+public class GetDataLifecycleAction extends ActionType<GetDataLifecycleAction.Response> {
+
+    public static final GetDataLifecycleAction INSTANCE = new GetDataLifecycleAction();
+    public static final String NAME = "indices:admin/data_lifecycle/get";
+
+    private GetDataLifecycleAction() {
+        super(NAME, Response::new);
+    }
+
+    public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
+
+        private String[] names;
+        private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, true, true, true, false, false, true, false);
+        private boolean includeDefaults = false;
+
+        public Request(String[] names) {
+            this.names = names;
+        }
+
+        public Request(String[] names, boolean includeDefaults) {
+            this.names = names;
+            this.includeDefaults = includeDefaults;
+        }
+
+        public String[] getNames() {
+            return names;
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            this.names = in.readOptionalStringArray();
+            this.indicesOptions = IndicesOptions.readIndicesOptions(in);
+            this.includeDefaults = in.readBoolean();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeOptionalStringArray(names);
+            indicesOptions.writeIndicesOptions(out);
+            out.writeBoolean(includeDefaults);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Request request = (Request) o;
+            return Arrays.equals(names, request.names)
+                && indicesOptions.equals(request.indicesOptions)
+                && includeDefaults == request.includeDefaults;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(indicesOptions, includeDefaults);
+            result = 31 * result + Arrays.hashCode(names);
+            return result;
+        }
+
+        @Override
+        public String[] indices() {
+            return names;
+        }
+
+        @Override
+        public IndicesOptions indicesOptions() {
+            return indicesOptions;
+        }
+
+        public boolean includeDefaults() {
+            return includeDefaults;
+        }
+
+        public Request indicesOptions(IndicesOptions indicesOptions) {
+            this.indicesOptions = indicesOptions;
+            return this;
+        }
+
+        @Override
+        public boolean includeDataStreams() {
+            return true;
+        }
+
+        @Override
+        public IndicesRequest indices(String... indices) {
+            this.names = indices;
+            return this;
+        }
+
+        public Request includeDefaults(boolean includeDefaults) {
+            this.includeDefaults = includeDefaults;
+            return this;
+        }
+    }
+
+    public static class Response extends ActionResponse implements ChunkedToXContentObject {
+        public static final ParseField DATA_STREAMS_FIELD = new ParseField("data_streams");
+
+        public record DataStreamLifecycle(String dataStreamName, DataLifecycle lifecycle) implements Writeable, ToXContentObject {
+
+            public static final ParseField NAME_FIELD = new ParseField("name");
+            public static final ParseField LIFECYCLE_FIELD = new ParseField("lifecycle");
+
+            DataStreamLifecycle(StreamInput in) throws IOException {
+                this(in.readString(), in.readOptionalWriteable(DataLifecycle::new));
+            }
+
+            @Override
+            public void writeTo(StreamOutput out) throws IOException {
+                out.writeString(dataStreamName);
+                out.writeOptionalWriteable(lifecycle);
+            }
+
+            @Override
+            public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+                return toXContent(builder, params, null);
+            }
+
+            /**
+             * Converts the response to XContent and passes the RolloverConditions, when provided, to the data lifecycle.
+             */
+            public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nullable RolloverConditions rolloverConditions)
+                throws IOException {
+                builder.startObject();
+                builder.field(NAME_FIELD.getPreferredName(), dataStreamName);
+                builder.field(LIFECYCLE_FIELD.getPreferredName());
+                lifecycle.toXContent(builder, params, rolloverConditions);
+                builder.endObject();
+                return builder;
+            }
+        }
+
+        private final List<DataStreamLifecycle> dataStreamLifecycles;
+        @Nullable
+        private final RolloverConditions rolloverConditions;
+
+        public Response(List<DataStreamLifecycle> dataStreamLifecycles) {
+            this(dataStreamLifecycles, null);
+        }
+
+        public Response(List<DataStreamLifecycle> dataStreamLifecycles, @Nullable RolloverConditions rolloverConditions) {
+            this.dataStreamLifecycles = dataStreamLifecycles;
+            this.rolloverConditions = rolloverConditions;
+        }
+
+        public Response(StreamInput in) throws IOException {
+            this(in.readList(DataStreamLifecycle::new), in.readOptionalWriteable(RolloverConditions::new));
+        }
+
+        public List<DataStreamLifecycle> getDataStreamLifecycles() {
+            return dataStreamLifecycles;
+        }
+
+        @Nullable
+        public RolloverConditions getRolloverConditions() {
+            return rolloverConditions;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeList(dataStreamLifecycles);
+            out.writeOptionalWriteable(rolloverConditions);
+        }
+
+        @Override
+        public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
+            final Iterator<? extends ToXContent> lifecyclesIterator = dataStreamLifecycles.stream()
+                .map(
+                    dataStreamLifecycle -> (ToXContent) (builder, params) -> dataStreamLifecycle.toXContent(
+                        builder,
+                        params,
+                        rolloverConditions
+                    )
+                )
+                .iterator();
+
+            return Iterators.concat(Iterators.single((builder, params) -> {
+                builder.startObject();
+                builder.startArray(DATA_STREAMS_FIELD.getPreferredName());
+                return builder;
+            }), lifecyclesIterator, Iterators.single((ToXContent) (builder, params) -> {
+                builder.endArray();
+                builder.endObject();
+                return builder;
+            }));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Response response = (Response) o;
+            return dataStreamLifecycles.equals(response.dataStreamLifecycles)
+                && Objects.equals(rolloverConditions, response.rolloverConditions);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dataStreamLifecycles, rolloverConditions);
+        }
+    }
+}

+ 139 - 0
modules/dlm/src/main/java/org/elasticsearch/dlm/action/PutDataLifecycleAction.java

@@ -0,0 +1,139 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.dlm.action;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.cluster.metadata.DataLifecycle;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.xcontent.ConstructingObjectParser;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.ToXContentObject;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Sets the data lifecycle that was provided in the request to the requested data streams.
+ */
+public class PutDataLifecycleAction extends ActionType<AcknowledgedResponse> {
+
+    public static final PutDataLifecycleAction INSTANCE = new PutDataLifecycleAction();
+    public static final String NAME = "indices:admin/data_lifecycle/put";
+
+    private PutDataLifecycleAction() {
+        super(NAME, AcknowledgedResponse::readFrom);
+    }
+
+    public static final class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable, ToXContentObject {
+
+        private String[] names;
+        private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, true, true, true, false, false, true, false);
+        private final DataLifecycle lifecycle;
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            this.names = in.readStringArray();
+            this.indicesOptions = IndicesOptions.readIndicesOptions(in);
+            lifecycle = new DataLifecycle(in);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeStringArray(names);
+            indicesOptions.writeIndicesOptions(out);
+            out.writeWriteable(lifecycle);
+        }
+
+        public Request(String[] names, DataLifecycle lifecycle) {
+            this.names = names;
+            this.lifecycle = lifecycle;
+        }
+
+        public String[] getNames() {
+            return names;
+        }
+
+        public DataLifecycle getLifecycle() {
+            return lifecycle;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.field("lifecycle", lifecycle);
+            builder.endObject();
+            return builder;
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        public static final ConstructingObjectParser<Request, Void> PARSER = new ConstructingObjectParser<>(
+            "data_stream_actions",
+            args -> new Request(null, ((DataLifecycle) args[0]))
+        );
+        static {
+            PARSER.declareObject(ConstructingObjectParser.constructorArg(), DataLifecycle.PARSER, new ParseField("lifecycle"));
+        }
+
+        @Override
+        public String[] indices() {
+            return names;
+        }
+
+        @Override
+        public IndicesOptions indicesOptions() {
+            return indicesOptions;
+        }
+
+        public Request indicesOptions(IndicesOptions indicesOptions) {
+            this.indicesOptions = indicesOptions;
+            return this;
+        }
+
+        @Override
+        public boolean includeDataStreams() {
+            return true;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Request request = (Request) o;
+            return Arrays.equals(names, request.names)
+                && Objects.equals(indicesOptions, request.indicesOptions)
+                && lifecycle.equals(request.lifecycle);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(indicesOptions, lifecycle);
+            result = 31 * result + Arrays.hashCode(names);
+            return result;
+        }
+
+        @Override
+        public IndicesRequest indices(String... names) {
+            this.names = names;
+            return this;
+        }
+    }
+}

+ 84 - 0
modules/dlm/src/main/java/org/elasticsearch/dlm/action/TransportDeleteDataLifecycleAction.java

@@ -0,0 +1,84 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.dlm.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.datastreams.action.DataStreamsActionUtil;
+import org.elasticsearch.indices.SystemIndices;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.List;
+
+/**
+ * Transport action that resolves the data stream names from the request and removes any configured lifecycle from them.
+ */
+public class TransportDeleteDataLifecycleAction extends AcknowledgedTransportMasterNodeAction<DeleteDataLifecycleAction.Request> {
+
+    private final MetadataDataStreamsService metadataDataStreamsService;
+    private final SystemIndices systemIndices;
+
+    @Inject
+    public TransportDeleteDataLifecycleAction(
+        TransportService transportService,
+        ClusterService clusterService,
+        ThreadPool threadPool,
+        ActionFilters actionFilters,
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        MetadataDataStreamsService metadataDataStreamsService,
+        SystemIndices systemIndices
+    ) {
+        super(
+            DeleteDataLifecycleAction.NAME,
+            transportService,
+            clusterService,
+            threadPool,
+            actionFilters,
+            DeleteDataLifecycleAction.Request::new,
+            indexNameExpressionResolver,
+            ThreadPool.Names.SAME
+        );
+        this.metadataDataStreamsService = metadataDataStreamsService;
+        this.systemIndices = systemIndices;
+    }
+
+    @Override
+    protected void masterOperation(
+        Task task,
+        DeleteDataLifecycleAction.Request request,
+        ClusterState state,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        List<String> dataStreamNames = DataStreamsActionUtil.getDataStreamNames(
+            indexNameExpressionResolver,
+            state,
+            request.getNames(),
+            request.indicesOptions()
+        );
+        for (String name : dataStreamNames) {
+            systemIndices.validateDataStreamAccess(name, threadPool.getThreadContext());
+        }
+        metadataDataStreamsService.removeLifecycle(dataStreamNames, request.ackTimeout(), request.masterNodeTimeout(), listener);
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(DeleteDataLifecycleAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
+    }
+}

+ 101 - 0
modules/dlm/src/main/java/org/elasticsearch/dlm/action/TransportGetDataLifecycleAction.java

@@ -0,0 +1,101 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.dlm.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.DataLifecycle;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.datastreams.action.DataStreamsActionUtil;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Collects the data streams from the cluster state, filters the ones that do not have a lifecycle configured and then returns
+ * a list of the data stream name and respective lifecycle configuration.
+ */
+public class TransportGetDataLifecycleAction extends TransportMasterNodeReadAction<
+    GetDataLifecycleAction.Request,
+    GetDataLifecycleAction.Response> {
+    private final ClusterSettings clusterSettings;
+
+    @Inject
+    public TransportGetDataLifecycleAction(
+        TransportService transportService,
+        ClusterService clusterService,
+        ThreadPool threadPool,
+        ActionFilters actionFilters,
+        IndexNameExpressionResolver indexNameExpressionResolver
+    ) {
+        super(
+            GetDataLifecycleAction.NAME,
+            transportService,
+            clusterService,
+            threadPool,
+            actionFilters,
+            GetDataLifecycleAction.Request::new,
+            indexNameExpressionResolver,
+            GetDataLifecycleAction.Response::new,
+            ThreadPool.Names.SAME
+        );
+        clusterSettings = clusterService.getClusterSettings();
+    }
+
+    @Override
+    protected void masterOperation(
+        Task task,
+        GetDataLifecycleAction.Request request,
+        ClusterState state,
+        ActionListener<GetDataLifecycleAction.Response> listener
+    ) {
+        List<String> results = DataStreamsActionUtil.getDataStreamNames(
+            indexNameExpressionResolver,
+            state,
+            request.getNames(),
+            request.indicesOptions()
+        );
+        Map<String, DataStream> dataStreams = state.metadata().dataStreams();
+
+        listener.onResponse(
+            new GetDataLifecycleAction.Response(
+                results.stream()
+                    .map(dataStreams::get)
+                    .filter(dataStream -> dataStream != null && dataStream.getLifecycle() != null)
+                    .map(
+                        dataStream -> new GetDataLifecycleAction.Response.DataStreamLifecycle(
+                            dataStream.getName(),
+                            dataStream.getLifecycle()
+                        )
+                    )
+                    .sorted(Comparator.comparing(GetDataLifecycleAction.Response.DataStreamLifecycle::dataStreamName))
+                    .toList(),
+                request.includeDefaults() && DataLifecycle.isEnabled()
+                    ? clusterSettings.get(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING)
+                    : null
+            )
+        );
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(GetDataLifecycleAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
+    }
+}

+ 90 - 0
modules/dlm/src/main/java/org/elasticsearch/dlm/action/TransportPutDataLifecycleAction.java

@@ -0,0 +1,90 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.dlm.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.datastreams.action.DataStreamsActionUtil;
+import org.elasticsearch.indices.SystemIndices;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.List;
+
+/**
+ * Transport action that resolves the data stream names from the request and sets the data lifecycle provided in the request.
+ */
+public class TransportPutDataLifecycleAction extends AcknowledgedTransportMasterNodeAction<PutDataLifecycleAction.Request> {
+
+    private final MetadataDataStreamsService metadataDataStreamsService;
+    private final SystemIndices systemIndices;
+
+    @Inject
+    public TransportPutDataLifecycleAction(
+        TransportService transportService,
+        ClusterService clusterService,
+        ThreadPool threadPool,
+        ActionFilters actionFilters,
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        MetadataDataStreamsService metadataDataStreamsService,
+        SystemIndices systemIndices
+    ) {
+        super(
+            PutDataLifecycleAction.NAME,
+            transportService,
+            clusterService,
+            threadPool,
+            actionFilters,
+            PutDataLifecycleAction.Request::new,
+            indexNameExpressionResolver,
+            ThreadPool.Names.SAME
+        );
+        this.metadataDataStreamsService = metadataDataStreamsService;
+        this.systemIndices = systemIndices;
+    }
+
+    @Override
+    protected void masterOperation(
+        Task task,
+        PutDataLifecycleAction.Request request,
+        ClusterState state,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        List<String> dataStreamNames = DataStreamsActionUtil.getDataStreamNames(
+            indexNameExpressionResolver,
+            state,
+            request.getNames(),
+            request.indicesOptions()
+        );
+        for (String name : dataStreamNames) {
+            systemIndices.validateDataStreamAccess(name, threadPool.getThreadContext());
+        }
+        metadataDataStreamsService.setLifecycle(
+            dataStreamNames,
+            request.getLifecycle(),
+            request.ackTimeout(),
+            request.masterNodeTimeout(),
+            listener
+        );
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(PutDataLifecycleAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
+    }
+}

+ 49 - 0
modules/dlm/src/main/java/org/elasticsearch/dlm/rest/RestDeleteDataLifecycleAction.java

@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.dlm.rest;
+
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.dlm.action.DeleteDataLifecycleAction;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestToXContentListener;
+
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.DELETE;
+
+@ServerlessScope(Scope.PUBLIC)
+public class RestDeleteDataLifecycleAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "delete_data_lifecycles_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(DELETE, "/_data_stream/{name}/_lifecycle"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
+        DeleteDataLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataLifecycleAction.Request(
+            Strings.splitStringByCommaToArray(request.param("name"))
+        );
+        deleteDataLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteDataLifecycleRequest.indicesOptions()));
+        return channel -> client.execute(
+            DeleteDataLifecycleAction.INSTANCE,
+            deleteDataLifecycleRequest,
+            new RestToXContentListener<>(channel)
+        );
+    }
+}

+ 55 - 0
modules/dlm/src/main/java/org/elasticsearch/dlm/rest/RestGetDataLifecycleAction.java

@@ -0,0 +1,55 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.dlm.rest;
+
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.dlm.action.GetDataLifecycleAction;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestChunkedToXContentListener;
+
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+
+@ServerlessScope(Scope.PUBLIC)
+public class RestGetDataLifecycleAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "get_data_lifecycles_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(GET, "/_data_stream/{name}/_lifecycle"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
+        GetDataLifecycleAction.Request getDataLifecycleRequest = new GetDataLifecycleAction.Request(
+            Strings.splitStringByCommaToArray(request.param("name"))
+        );
+        getDataLifecycleRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
+        getDataLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataLifecycleRequest.indicesOptions()));
+        return channel -> client.execute(
+            GetDataLifecycleAction.INSTANCE,
+            getDataLifecycleRequest,
+            new RestChunkedToXContentListener<>(channel)
+        );
+    }
+
+    @Override
+    public boolean allowSystemIndexAccessByDefault() {
+        return true;
+    }
+}

+ 55 - 0
modules/dlm/src/main/java/org/elasticsearch/dlm/rest/RestPutDataLifecycleAction.java

@@ -0,0 +1,55 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.dlm.rest;
+
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.cluster.metadata.DataLifecycle;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.dlm.action.PutDataLifecycleAction;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.PUT;
+
+@ServerlessScope(Scope.PUBLIC)
+public class RestPutDataLifecycleAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "put_data_lifecycles_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(PUT, "/_data_stream/{name}/_lifecycle"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
+        DataLifecycle lifecycle;
+        try (XContentParser parser = request.contentParser()) {
+            lifecycle = PutDataLifecycleAction.Request.PARSER.parse(parser, null).getLifecycle();
+        }
+        PutDataLifecycleAction.Request putLifecycleRequest = new PutDataLifecycleAction.Request(
+            Strings.splitStringByCommaToArray(request.param("name")),
+            lifecycle
+        );
+        putLifecycleRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout()));
+        putLifecycleRequest.timeout(request.paramAsTime("timeout", putLifecycleRequest.timeout()));
+        putLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, putLifecycleRequest.indicesOptions()));
+        return channel -> client.execute(PutDataLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel));
+    }
+}

+ 99 - 0
modules/dlm/src/test/java/org/elasticsearch/dlm/DLMFixtures.java

@@ -0,0 +1,99 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.dlm;
+
+import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
+import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
+import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.DataLifecycle;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.Index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.lucene.tests.util.LuceneTestCase.rarely;
+import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance;
+import static org.elasticsearch.test.ESIntegTestCase.client;
+import static org.elasticsearch.test.ESTestCase.randomIntBetween;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Provides helper methods that can be used to tests. Examples of the functionalities it provides are:
+ * - random lifecycle generation
+ * - putting a composable template
+ * - creating a data stream model
+ */
+public class DLMFixtures {
+
+    static DataStream createDataStream(
+        Metadata.Builder builder,
+        String dataStreamName,
+        int backingIndicesCount,
+        Settings.Builder backingIndicesSettings,
+        @Nullable DataLifecycle lifecycle,
+        Long now
+    ) {
+        final List<Index> backingIndices = new ArrayList<>();
+        for (int k = 1; k <= backingIndicesCount; k++) {
+            IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k))
+                .settings(backingIndicesSettings)
+                .numberOfShards(1)
+                .numberOfReplicas(1)
+                .creationDate(now - 3000L);
+            if (k < backingIndicesCount) {
+                // add rollover info only for non-write indices
+                MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L));
+                indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L));
+            }
+            IndexMetadata indexMetadata = indexMetaBuilder.build();
+            builder.put(indexMetadata, false);
+            backingIndices.add(indexMetadata.getIndex());
+        }
+        return newInstance(dataStreamName, backingIndices, backingIndicesCount, null, false, lifecycle);
+    }
+
+    static void putComposableIndexTemplate(
+        String id,
+        @Nullable String mappings,
+        List<String> patterns,
+        @Nullable Settings settings,
+        @Nullable Map<String, Object> metadata,
+        @Nullable DataLifecycle lifecycle
+    ) throws IOException {
+        PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id);
+        request.indexTemplate(
+            new ComposableIndexTemplate(
+                patterns,
+                new Template(settings, mappings == null ? null : CompressedXContent.fromJSON(mappings), null, lifecycle),
+                null,
+                null,
+                null,
+                metadata,
+                new ComposableIndexTemplate.DataStreamTemplate(),
+                null
+            )
+        );
+        assertTrue(client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet().isAcknowledged());
+    }
+
+    static DataLifecycle randomDataLifecycle() {
+        return rarely() ? new DataLifecycle() : new DataLifecycle(TimeValue.timeValueDays(randomIntBetween(1, 365)));
+    }
+}

+ 15 - 38
modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleServiceTests.java

@@ -14,8 +14,6 @@ import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
-import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
@@ -32,7 +30,6 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.test.ESTestCase;
@@ -47,7 +44,6 @@ import java.io.IOException;
 import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -55,7 +51,7 @@ import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance;
+import static org.elasticsearch.dlm.DLMFixtures.createDataStream;
 import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
 import static org.elasticsearch.test.ClusterServiceUtils.setState;
 import static org.hamcrest.Matchers.instanceOf;
@@ -115,7 +111,8 @@ public class DataLifecycleServiceTests extends ESTestCase {
             dataStreamName,
             numBackingIndices,
             settings(Version.CURRENT),
-            new DataLifecycle(TimeValue.timeValueMillis(0))
+            new DataLifecycle(TimeValue.timeValueMillis(0)),
+            now
         );
         builder.put(dataStream);
 
@@ -147,7 +144,8 @@ public class DataLifecycleServiceTests extends ESTestCase {
             dataStreamName,
             numBackingIndices,
             settings(Version.CURRENT),
-            new DataLifecycle((TimeValue) null)
+            new DataLifecycle((TimeValue) null),
+            now
         );
         builder.put(dataStream);
 
@@ -166,7 +164,8 @@ public class DataLifecycleServiceTests extends ESTestCase {
             dataStreamName,
             numBackingIndices,
             settings(Version.CURRENT),
-            new DataLifecycle(TimeValue.timeValueDays(700))
+            new DataLifecycle(TimeValue.timeValueDays(700)),
+            now
         );
         builder.put(dataStream);
 
@@ -185,7 +184,8 @@ public class DataLifecycleServiceTests extends ESTestCase {
             dataStreamName,
             numBackingIndices,
             Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy").put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT),
-            new DataLifecycle(TimeValue.timeValueMillis(0))
+            new DataLifecycle(TimeValue.timeValueMillis(0)),
+            now
         );
         builder.put(dataStream);
 
@@ -203,7 +203,8 @@ public class DataLifecycleServiceTests extends ESTestCase {
             dataStreamName,
             numBackingIndices,
             Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy").put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT),
-            null
+            null,
+            now
         );
         builder.put(dataStream);
 
@@ -221,7 +222,8 @@ public class DataLifecycleServiceTests extends ESTestCase {
             dataStreamName,
             numBackingIndices,
             Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT),
-            new DataLifecycle()
+            new DataLifecycle(),
+            now
         );
         builder.put(dataStream);
         String nodeId = "localNode";
@@ -269,7 +271,8 @@ public class DataLifecycleServiceTests extends ESTestCase {
             dataStreamName,
             numBackingIndices,
             settings(Version.CURRENT),
-            new DataLifecycle(TimeValue.timeValueDays(700))
+            new DataLifecycle(TimeValue.timeValueDays(700)),
+            now
         );
         // all backing indices are in the error store
         for (Index index : dataStream.getIndices()) {
@@ -318,32 +321,6 @@ public class DataLifecycleServiceTests extends ESTestCase {
         );
     }
 
-    private DataStream createDataStream(
-        Metadata.Builder builder,
-        String dataStreamName,
-        int backingIndicesCount,
-        Settings.Builder backingIndicesSettings,
-        @Nullable DataLifecycle lifecycle
-    ) {
-        final List<Index> backingIndices = new ArrayList<>();
-        for (int k = 1; k <= backingIndicesCount; k++) {
-            IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k))
-                .settings(backingIndicesSettings)
-                .numberOfShards(1)
-                .numberOfReplicas(1)
-                .creationDate(now - 3000L);
-            if (k < backingIndicesCount) {
-                // add rollover info only for non-write indices
-                MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L));
-                indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L));
-            }
-            IndexMetadata indexMetadata = indexMetaBuilder.build();
-            builder.put(indexMetadata, false);
-            backingIndices.add(indexMetadata.getIndex());
-        }
-        return newInstance(dataStreamName, backingIndices, backingIndicesCount, null, false, lifecycle);
-    }
-
     private NoOpClient getTransportRequestsRecordingClient() {
         return new NoOpClient(getTestName()) {
             @Override

+ 109 - 0
modules/dlm/src/yamlRestTest/resources/rest-api-spec/test/dlm/20_basic.yml

@@ -0,0 +1,109 @@
+setup:
+  - skip:
+      features: allowed_warnings
+      version: " - 8.7.99"
+      reason: "data lifecycles only supported in 8.8+"
+  - do:
+      allowed_warnings:
+        - "index template [my-lifecycle] has index patterns [data-stream-with-lifecycle] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-lifecycle] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-lifecycle
+        body:
+          index_patterns: [data-stream-with-lifecycle]
+          template:
+            settings:
+              index.number_of_replicas: 0
+            lifecycle:
+              data_retention: "10d"
+          data_stream: {}
+
+  - do:
+      allowed_warnings:
+        - "index template [my-template1] has index patterns [simple-data-stream1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template1
+        body:
+          index_patterns: [simple-data-stream1]
+          template:
+            mappings:
+              properties:
+                '@timestamp':
+                  type: date_nanos
+          data_stream: {}
+  - do:
+      indices.create_data_stream:
+        name: data-stream-with-lifecycle
+
+  - do:
+      indices.create_data_stream:
+        name: simple-data-stream1
+
+---
+"Get data lifecycle":
+
+  - do:
+      indices.get_data_lifecycle:
+        name: "*"
+  - length: { data_streams: 1}
+  - match: { data_streams.0.name: data-stream-with-lifecycle }
+  - match: { data_streams.0.lifecycle.data_retention: '10d' }
+
+---
+"Put data lifecycle":
+
+  - do:
+      indices.put_data_lifecycle:
+        name: "*"
+        body:
+          lifecycle:
+            data_retention: '30d'
+  - is_true: acknowledged
+
+  - do:
+      indices.get_data_lifecycle:
+        name: "*"
+  - length: { data_streams: 2 }
+  - match: { data_streams.0.name: data-stream-with-lifecycle }
+  - match: { data_streams.0.lifecycle.data_retention: '30d' }
+  - match: { data_streams.1.name: simple-data-stream1 }
+  - match: { data_streams.1.lifecycle.data_retention: '30d' }
+
+
+---
+"Get data lifecycle with defaults":
+
+  - do:
+      indices.get_data_lifecycle:
+        name: "data-stream-with-lifecycle"
+        include_defaults: true
+  - length: { data_streams: 1}
+  - match: { data_streams.0.name: data-stream-with-lifecycle }
+  - match: { data_streams.0.lifecycle.data_retention: '10d' }
+  - is_true: data_streams.0.lifecycle.rollover
+
+---
+"Delete data lifecycle from the data streams":
+  - do:
+      indices.put_data_lifecycle:
+        name: "simple-data-stream1"
+        body:
+          lifecycle:
+            data_retention: '30d'
+  - is_true: acknowledged
+
+  - do:
+      indices.get_data_lifecycle:
+        name: "*"
+  - length: { data_streams: 2 }
+
+  - do:
+      indices.delete_data_lifecycle:
+        name: "simple-data-stream1"
+  - is_true: acknowledged
+
+  - do:
+      indices.get_data_lifecycle:
+        name: "*"
+  - length: { data_streams: 1 }
+  - match: { data_streams.0.name: data-stream-with-lifecycle }
+  - match: { data_streams.0.lifecycle.data_retention: '10d' }

+ 76 - 0
modules/dlm/src/yamlRestTest/resources/rest-api-spec/test/dlm/30_not_found.yml

@@ -0,0 +1,76 @@
+setup:
+  - skip:
+      features: allowed_warnings
+      version: " - 8.7.99"
+      reason: "data lifecycles only supported in 8.8+"
+  - do:
+      allowed_warnings:
+        - "index template [my-lifecycle] has index patterns [my-data-stream-1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-lifecycle] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-lifecycle
+        body:
+          index_patterns: [my-data-stream-1]
+          template:
+            settings:
+              index.number_of_replicas: 0
+            lifecycle:
+              data_retention: "10d"
+          data_stream: {}
+
+  - do:
+      indices.create_data_stream:
+        name: my-data-stream-1
+
+---
+"Get data lifecycle":
+
+  - do:
+      indices.get_data_lifecycle:
+        name: "*"
+  - length: { data_streams: 1}
+  - match: { data_streams.0.name: my-data-stream-1 }
+  - match: { data_streams.0.lifecycle.data_retention: '10d' }
+
+---
+"Get data lifecycle when at least one data stream does not exist":
+
+  - do:
+      catch:  missing
+      indices.get_data_lifecycle:
+        name: "my-data-stream-1,does-not-exist"
+  - match: { error.reason: "no such index [does-not-exist]" }
+
+---
+"Put data lifecycle does not succeed when at lease one data stream does not exist":
+
+  - do:
+      catch:  missing
+      indices.put_data_lifecycle:
+        name: "my-data-stream-1,does-not-exist"
+        body:
+          lifecycle:
+            data_retention: '30d'
+  - match: { error.reason: "no such index [does-not-exist]" }
+
+  - do:
+      indices.get_data_lifecycle:
+        name: "*"
+  - length: { data_streams: 1 }
+  - match: { data_streams.0.name: my-data-stream-1 }
+  - match: { data_streams.0.lifecycle.data_retention: '10d' }
+
+---
+"Delete data lifecycle does not succeed when at lease one data stream does not exist":
+
+  - do:
+      catch:  missing
+      indices.delete_data_lifecycle:
+        name: "my-data-stream-1,does-not-exist"
+  - match: { error.reason: "no such index [does-not-exist]" }
+
+  - do:
+      indices.get_data_lifecycle:
+        name: "*"
+  - length: { data_streams: 1 }
+  - match: { data_streams.0.name: my-data-stream-1 }
+  - match: { data_streams.0.lifecycle.data_retention: '10d' }

+ 57 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_lifecycle.json

@@ -0,0 +1,57 @@
+{
+  "indices.delete_data_lifecycle":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/dlm-delete-lifecycle.html",
+      "description":"Deletes the data lifecycle of the selected data streams."
+    },
+    "stability":"stable",
+    "visibility":"public",
+    "headers":{
+      "accept": [ "application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_data_stream/_lifecycle",
+          "methods":[
+            "DELETE"
+          ]
+        },
+        {
+          "path":"/_data_stream/{name}/_lifecycle",
+          "methods":[
+            "DELETE"
+          ],
+          "parts":{
+            "name":{
+              "type":"list",
+              "description":"A comma-separated list of data streams of which the data lifecycle will be deleted; use `*` to get all data streams"
+            }
+          }
+        }
+      ]
+    },
+    "params": {
+      "expand_wildcards": {
+        "type": "enum",
+        "options": [
+          "open",
+          "closed",
+          "hidden",
+          "none",
+          "all"
+        ],
+        "default": "open",
+        "description": "Whether wildcard expressions should get expanded to open or closed indices (default: open)"
+      },
+      "timeout": {
+        "type": "time",
+        "description": "Explicit timestamp for the document"
+      },
+      "master_timeout": {
+        "type": "time",
+        "description": "Specify timeout for connection to master"
+      }
+    }
+  }
+}

+ 53 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_lifecycle.json

@@ -0,0 +1,53 @@
+{
+  "indices.get_data_lifecycle":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/dlm-get-lifecycle.html",
+      "description":"Returns the data lifecycle of the selected data streams."
+    },
+    "stability":"stable",
+    "visibility":"public",
+    "headers":{
+      "accept": [ "application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_data_stream/_lifecycle",
+          "methods":[
+            "GET"
+          ]
+        },
+        {
+          "path":"/_data_stream/{name}/_lifecycle",
+          "methods":[
+            "GET"
+          ],
+          "parts":{
+            "name":{
+              "type":"list",
+              "description":"A comma-separated list of data streams to get; use `*` to get all data streams"
+            }
+          }
+        }
+      ]
+    },
+    "params":{
+      "expand_wildcards":{
+        "type":"enum",
+        "options":[
+          "open",
+          "closed",
+          "hidden",
+          "none",
+          "all"
+        ],
+        "default":"open",
+        "description":"Whether wildcard expressions should get expanded to open or closed indices (default: open)"
+      },
+      "include_defaults":{
+        "type":"boolean",
+        "description":"Return all relevant default configurations for the data stream (default: false)"
+      }
+    }
+  }
+}

+ 61 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_data_lifecycle.json

@@ -0,0 +1,61 @@
+{
+  "indices.put_data_lifecycle":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/dlm-put-lifecycle.html",
+      "description":"Updates the data lifecycle of the selected data streams."
+    },
+    "stability":"stable",
+    "visibility":"public",
+    "headers":{
+      "accept": [ "application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_data_stream/_lifecycle",
+          "methods":[
+            "PUT"
+          ]
+        },
+        {
+          "path":"/_data_stream/{name}/_lifecycle",
+          "methods":[
+            "PUT"
+          ],
+          "parts":{
+            "name":{
+              "type":"list",
+              "description":"A comma-separated list of data streams whose lifecycle will be updated; use `*` to set the lifecycle to all data streams"
+            }
+          }
+        }
+      ]
+    },
+    "params":{
+      "expand_wildcards":{
+        "type":"enum",
+        "options":[
+          "open",
+          "closed",
+          "hidden",
+          "none",
+          "all"
+        ],
+        "default":"open",
+        "description":"Whether wildcard expressions should get expanded to open or closed indices (default: open)"
+      },
+      "timeout":{
+        "type":"time",
+        "description":"Explicit timestamp for the document"
+      },
+      "master_timeout":{
+        "type":"time",
+        "description":"Specify timeout for connection to master"
+      }
+    },
+    "body":{
+      "description":"The data lifecycle configuration that consist of the data retention",
+      "required":false
+    }
+  }
+}

+ 59 - 0
server/src/main/java/org/elasticsearch/cluster/AckedBatchedClusterStateUpdateTask.java

@@ -0,0 +1,59 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.core.TimeValue;
+
+/**
+ * This class models a cluster state update task that notifies an AcknowledgedResponse listener when
+ * all the nodes have acknowledged the cluster state update request. It works with batched cluster state updates.
+ */
+public class AckedBatchedClusterStateUpdateTask implements ClusterStateTaskListener, ClusterStateAckListener {
+
+    private final ActionListener<AcknowledgedResponse> listener;
+    private final TimeValue ackTimeout;
+
+    public AckedBatchedClusterStateUpdateTask(TimeValue ackTimeout, ActionListener<AcknowledgedResponse> listener) {
+        this.ackTimeout = ackTimeout;
+        this.listener = listener;
+    }
+
+    @Override
+    public void onFailure(Exception e) {
+        listener.onFailure(e);
+    }
+
+    @Override
+    public boolean mustAck(DiscoveryNode discoveryNode) {
+        return true;
+    }
+
+    @Override
+    public void onAllNodesAcked() {
+        listener.onResponse(AcknowledgedResponse.TRUE);
+    }
+
+    @Override
+    public void onAckFailure(Exception e) {
+        listener.onResponse(AcknowledgedResponse.FALSE);
+    }
+
+    @Override
+    public void onAckTimeout() {
+        listener.onResponse(AcknowledgedResponse.FALSE);
+    }
+
+    @Override
+    public TimeValue ackTimeout() {
+        return ackTimeout;
+    }
+}

+ 1 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/DataLifecycle.java

@@ -49,7 +49,7 @@ public class DataLifecycle implements SimpleDiffable<DataLifecycle>, ToXContentO
     private static final ParseField DATA_RETENTION_FIELD = new ParseField("data_retention");
     private static final ParseField ROLLOVER_FIELD = new ParseField("rollover");
 
-    private static final ConstructingObjectParser<DataLifecycle, Void> PARSER = new ConstructingObjectParser<>(
+    public static final ConstructingObjectParser<DataLifecycle, Void> PARSER = new ConstructingObjectParser<>(
         "lifecycle",
         false,
         (args, unused) -> new DataLifecycle((TimeValue) args[0])

+ 109 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

@@ -11,18 +11,27 @@ package org.elasticsearch.cluster.metadata;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.cluster.AckedBatchedClusterStateUpdateTask;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateAckListener;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.SuppressForbidden;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.indices.IndicesService;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.function.Function;
 
 /**
@@ -32,10 +41,27 @@ public class MetadataDataStreamsService {
 
     private final ClusterService clusterService;
     private final IndicesService indicesService;
+    private final MasterServiceTaskQueue<UpdateLifecycleTask> taskQueue;
 
     public MetadataDataStreamsService(ClusterService clusterService, IndicesService indicesService) {
         this.clusterService = clusterService;
         this.indicesService = indicesService;
+        ClusterStateTaskExecutor<UpdateLifecycleTask> executor = new SimpleBatchedAckListenerTaskExecutor<>() {
+
+            @Override
+            public Tuple<ClusterState, ClusterStateAckListener> executeTask(
+                UpdateLifecycleTask modifyLifecycleTask,
+                ClusterState clusterState
+            ) {
+                return new Tuple<>(
+                    updateDataLifecycle(clusterState, modifyLifecycleTask.getDataStreamNames(), modifyLifecycleTask.getDataLifecycle()),
+                    modifyLifecycleTask
+                );
+            }
+        };
+        // We chose priority high because changing the lifecycle is changing the retention of a backing index, so processing it quickly
+        // can either free space when the retention is shortened, or prevent an index to be deleted when the retention is extended.
+        this.taskQueue = clusterService.createTaskQueue("modify-lifecycle", Priority.HIGH, executor);
     }
 
     public void modifyDataStream(final ModifyDataStreamsAction.Request request, final ActionListener<AcknowledgedResponse> listener) {
@@ -57,6 +83,31 @@ public class MetadataDataStreamsService {
         }
     }
 
+    /**
+     * Submits the task to set the lifecycle to the requested data streams.
+     */
+    public void setLifecycle(
+        final List<String> dataStreamNames,
+        DataLifecycle lifecycle,
+        TimeValue ackTimeout,
+        TimeValue masterTimeout,
+        final ActionListener<AcknowledgedResponse> listener
+    ) {
+        taskQueue.submitTask("set-lifecycle", new UpdateLifecycleTask(dataStreamNames, lifecycle, ackTimeout, listener), masterTimeout);
+    }
+
+    /**
+     * Submits the task to remove the lifecycle from the requested data streams.
+     */
+    public void removeLifecycle(
+        List<String> dataStreamNames,
+        TimeValue ackTimeout,
+        TimeValue masterTimeout,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        taskQueue.submitTask("delete-lifecycle", new UpdateLifecycleTask(dataStreamNames, null, ackTimeout, listener), masterTimeout);
+    }
+
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
     private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
         clusterService.submitUnbatchedStateUpdateTask(source, task);
@@ -91,6 +142,37 @@ public class MetadataDataStreamsService {
         return ClusterState.builder(currentState).metadata(updatedMetadata).build();
     }
 
+    /**
+     * Creates an updated cluster state in which the requested data streams have the data lifecycle provided.
+     * Visible for testing.
+     */
+    static ClusterState updateDataLifecycle(
+        ClusterState currentState,
+        List<String> dataStreamNames,
+        @Nullable DataLifecycle dataLifecycle
+    ) {
+        Metadata metadata = currentState.metadata();
+        Metadata.Builder builder = Metadata.builder(metadata);
+        for (var dataStreamName : dataStreamNames) {
+            var dataStream = validateDataStream(metadata, dataStreamName);
+            builder.put(
+                new DataStream(
+                    dataStream.getName(),
+                    dataStream.getIndices(),
+                    dataStream.getGeneration(),
+                    dataStream.getMetadata(),
+                    dataStream.isHidden(),
+                    dataStream.isReplicated(),
+                    dataStream.isSystem(),
+                    dataStream.isAllowCustomRouting(),
+                    dataStream.getIndexMode(),
+                    dataLifecycle
+                )
+            );
+        }
+        return ClusterState.builder(currentState).metadata(builder.build()).build();
+    }
+
     private static void addBackingIndex(
         Metadata metadata,
         Metadata.Builder builder,
@@ -159,4 +241,31 @@ public class MetadataDataStreamsService {
         return index;
     }
 
+    /**
+     * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion.
+     */
+    static class UpdateLifecycleTask extends AckedBatchedClusterStateUpdateTask {
+
+        private final List<String> dataStreamNames;
+        private final DataLifecycle dataLifecycle;
+
+        UpdateLifecycleTask(
+            List<String> dataStreamNames,
+            @Nullable DataLifecycle dataLifecycle,
+            TimeValue ackTimeout,
+            ActionListener<AcknowledgedResponse> listener
+        ) {
+            super(ackTimeout, listener);
+            this.dataStreamNames = dataStreamNames;
+            this.dataLifecycle = dataLifecycle;
+        }
+
+        public List<String> getDataStreamNames() {
+            return dataStreamNames;
+        }
+
+        public DataLifecycle getDataLifecycle() {
+            return dataLifecycle;
+        }
+    }
 }

+ 23 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java

@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 public class MetadataDataStreamsServiceTests extends MapperServiceTestCase {
 
@@ -382,6 +383,28 @@ public class MetadataDataStreamsServiceTests extends MapperServiceTestCase {
         assertThat(e.getMessage(), equalTo("index [" + indexToRemove + "] not found"));
     }
 
+    public void testUpdateLifecycle() {
+        String dataStream = randomAlphaOfLength(5);
+        DataLifecycle dataLifecycle = new DataLifecycle(randomMillisUpToYear9999());
+        ClusterState before = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>(dataStream, 2)), List.of());
+        {
+            // Remove lifecycle
+            ClusterState after = MetadataDataStreamsService.updateDataLifecycle(before, List.of(dataStream), null);
+            DataStream updatedDataStream = after.metadata().dataStreams().get(dataStream);
+            assertNotNull(updatedDataStream);
+            assertThat(updatedDataStream.getLifecycle(), nullValue());
+            before = after;
+        }
+
+        {
+            // Set lifecycle
+            ClusterState after = MetadataDataStreamsService.updateDataLifecycle(before, List.of(dataStream), dataLifecycle);
+            DataStream updatedDataStream = after.metadata().dataStreams().get(dataStream);
+            assertNotNull(updatedDataStream);
+            assertThat(updatedDataStream.getLifecycle(), equalTo(dataLifecycle));
+        }
+    }
+
     private MapperService getMapperService(IndexMetadata im) {
         try {
             String mapping = im.mapping().source().toString();

+ 3 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -390,6 +390,9 @@ public class Constants {
         "indices:admin/data_stream/migrate",
         "indices:admin/data_stream/modify",
         "indices:admin/data_stream/promote",
+        "indices:admin/data_lifecycle/delete",
+        "indices:admin/data_lifecycle/get",
+        "indices:admin/data_lifecycle/put",
         "indices:admin/delete",
         "indices:admin/dlm/explain",
         "indices:admin/flush",