Browse Source

Introduce CRUD APIs for data stream options (#113945) (#114718)

In this PR we introduce two endpoint PUT and GET to manage the data
stream options and consequently the failure store configuration on the
data stream level. This means that we can manage the failure store of
existing data streams.

The APIs look like:

```
# Enable/disable 
PUT _data_stream/my-data-stream/_options
{
  "failure_store": {
    "enabled": true
  }
}

# Remove existing configuration
DELETE _data_stream/my-data-stream/_options

# Retrieve 
GET _data_stream/my-data-stream/_options
{
  "failure_store": {
    "enabled": true
  }
}
```

Future work:

- Document the new APIs
- Convert `DataStreamOptionsIT.java` to a yaml test.
Mary Gouseti 1 year ago
parent
commit
04bc70c84c
16 changed files with 1253 additions and 1 deletions
  1. 144 0
      modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamOptionsIT.java
  2. 1 0
      modules/data-streams/src/main/java/module-info.java
  3. 20 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java
  4. 108 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/DeleteDataStreamOptionsAction.java
  5. 223 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/GetDataStreamOptionsAction.java
  6. 165 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/PutDataStreamOptionsAction.java
  7. 86 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportDeleteDataStreamOptionsAction.java
  8. 104 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportGetDataStreamOptionsAction.java
  9. 92 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportPutDataStreamOptionsAction.java
  10. 54 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestDeleteDataStreamOptionsAction.java
  11. 58 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestGetDataStreamOptionsAction.java
  12. 58 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestPutDataStreamOptionsAction.java
  13. 94 0
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java
  14. 10 1
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamOptionsTests.java
  15. 33 0
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java
  16. 3 0
      x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

+ 144 - 0
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamOptionsIT.java

@@ -0,0 +1,144 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.datastreams;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+/**
+ * This should be a yaml test, but in order to write one we would need to expose the new APIs in the rest-api-spec.
+ * We do not want to do that until the feature flag is removed. For this reason, we temporarily, test the new APIs here.
+ * Please convert this to a yaml test when the feature flag is removed.
+ */
+public class DataStreamOptionsIT extends DisabledSecurityDataStreamTestCase {
+
+    private static final String DATA_STREAM_NAME = "failure-data-stream";
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setup() throws IOException {
+        Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/ds-template");
+        putComposableIndexTemplateRequest.setJsonEntity("""
+            {
+              "index_patterns": ["failure-data-stream"],
+              "template": {
+                "settings": {
+                  "number_of_replicas": 0
+                }
+              },
+              "data_stream": {
+                "failure_store": true
+              }
+            }
+            """);
+        assertOK(client().performRequest(putComposableIndexTemplateRequest));
+
+        assertOK(client().performRequest(new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME)));
+        // Initialize the failure store.
+        assertOK(client().performRequest(new Request("POST", DATA_STREAM_NAME + "/_rollover?target_failure_store")));
+        ensureGreen(DATA_STREAM_NAME);
+
+        final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME));
+        List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
+        assertThat(dataStreams.size(), is(1));
+        Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
+        assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME));
+        List<String> backingIndices = getIndices(dataStream);
+        assertThat(backingIndices.size(), is(1));
+        List<String> failureStore = getFailureStore(dataStream);
+        assertThat(failureStore.size(), is(1));
+    }
+
+    public void testEnableDisableFailureStore() throws IOException {
+        {
+            assertAcknowledged(client().performRequest(new Request("DELETE", "/_data_stream/" + DATA_STREAM_NAME + "/_options")));
+            assertFailureStore(false, 1);
+            assertDataStreamOptions(null);
+        }
+        {
+            Request enableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options");
+            enableRequest.setJsonEntity("""
+                {
+                  "failure_store": {
+                    "enabled": true
+                  }
+                }""");
+            assertAcknowledged(client().performRequest(enableRequest));
+            assertFailureStore(true, 1);
+            assertDataStreamOptions(true);
+        }
+
+        {
+            Request disableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options");
+            disableRequest.setJsonEntity("""
+                {
+                  "failure_store": {
+                    "enabled": false
+                  }
+                }""");
+            assertAcknowledged(client().performRequest(disableRequest));
+            assertFailureStore(false, 1);
+            assertDataStreamOptions(false);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertFailureStore(boolean failureStoreEnabled, int failureStoreSize) throws IOException {
+        final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME));
+        List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
+        assertThat(dataStreams.size(), is(1));
+        Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
+        assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME));
+        assertThat(dataStream.containsKey("failure_store"), is(true));
+        // Ensure the failure store is set to the provided value
+        assertThat(((Map<String, Object>) dataStream.get("failure_store")).get("enabled"), equalTo(failureStoreEnabled));
+        // And the failure indices preserved
+        List<String> failureStore = getFailureStore(dataStream);
+        assertThat(failureStore.size(), is(failureStoreSize));
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertDataStreamOptions(Boolean failureStoreEnabled) throws IOException {
+        final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME + "/_options"));
+        List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
+        assertThat(dataStreams.size(), is(1));
+        Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
+        assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME));
+        Map<String, Map<String, Object>> options = (Map<String, Map<String, Object>>) dataStream.get("options");
+        if (failureStoreEnabled == null) {
+            assertThat(options, nullValue());
+        } else {
+            assertThat(options.containsKey("failure_store"), is(true));
+            assertThat(options.get("failure_store").get("enabled"), equalTo(failureStoreEnabled));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<String> getFailureStore(Map<String, Object> response) {
+        var failureStore = (Map<String, Object>) response.get("failure_store");
+        return getIndices(failureStore);
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<String> getIndices(Map<String, Object> response) {
+        List<Map<String, String>> indices = (List<Map<String, String>>) response.get("indices");
+        return indices.stream().map(index -> index.get("index_name")).toList();
+    }
+}

+ 1 - 0
modules/data-streams/src/main/java/module-info.java

@@ -17,6 +17,7 @@ module org.elasticsearch.datastreams {
     exports org.elasticsearch.datastreams.action to org.elasticsearch.server;
     exports org.elasticsearch.datastreams.lifecycle.action to org.elasticsearch.server;
     exports org.elasticsearch.datastreams.lifecycle;
+    exports org.elasticsearch.datastreams.options.action to org.elasticsearch.server;
 
     provides org.elasticsearch.features.FeatureSpecification with org.elasticsearch.datastreams.DataStreamFeatures;
 }

+ 20 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

@@ -23,6 +23,7 @@ import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycle
 import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
 import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
 import org.elasticsearch.client.internal.OriginSettingClient;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -56,6 +57,15 @@ import org.elasticsearch.datastreams.lifecycle.rest.RestDeleteDataStreamLifecycl
 import org.elasticsearch.datastreams.lifecycle.rest.RestExplainDataStreamLifecycleAction;
 import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamLifecycleAction;
 import org.elasticsearch.datastreams.lifecycle.rest.RestPutDataStreamLifecycleAction;
+import org.elasticsearch.datastreams.options.action.DeleteDataStreamOptionsAction;
+import org.elasticsearch.datastreams.options.action.GetDataStreamOptionsAction;
+import org.elasticsearch.datastreams.options.action.PutDataStreamOptionsAction;
+import org.elasticsearch.datastreams.options.action.TransportDeleteDataStreamOptionsAction;
+import org.elasticsearch.datastreams.options.action.TransportGetDataStreamOptionsAction;
+import org.elasticsearch.datastreams.options.action.TransportPutDataStreamOptionsAction;
+import org.elasticsearch.datastreams.options.rest.RestDeleteDataStreamOptionsAction;
+import org.elasticsearch.datastreams.options.rest.RestGetDataStreamOptionsAction;
+import org.elasticsearch.datastreams.options.rest.RestPutDataStreamOptionsAction;
 import org.elasticsearch.datastreams.rest.RestCreateDataStreamAction;
 import org.elasticsearch.datastreams.rest.RestDataStreamsStatsAction;
 import org.elasticsearch.datastreams.rest.RestDeleteDataStreamAction;
@@ -229,6 +239,11 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlu
         actions.add(new ActionHandler<>(DeleteDataStreamLifecycleAction.INSTANCE, TransportDeleteDataStreamLifecycleAction.class));
         actions.add(new ActionHandler<>(ExplainDataStreamLifecycleAction.INSTANCE, TransportExplainDataStreamLifecycleAction.class));
         actions.add(new ActionHandler<>(GetDataStreamLifecycleStatsAction.INSTANCE, TransportGetDataStreamLifecycleStatsAction.class));
+        if (DataStream.isFailureStoreFeatureFlagEnabled()) {
+            actions.add(new ActionHandler<>(GetDataStreamOptionsAction.INSTANCE, TransportGetDataStreamOptionsAction.class));
+            actions.add(new ActionHandler<>(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class));
+            actions.add(new ActionHandler<>(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
+        }
         return actions;
     }
 
@@ -261,6 +276,11 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlu
         handlers.add(new RestDeleteDataStreamLifecycleAction());
         handlers.add(new RestExplainDataStreamLifecycleAction());
         handlers.add(new RestDataStreamLifecycleStatsAction());
+        if (DataStream.isFailureStoreFeatureFlagEnabled()) {
+            handlers.add(new RestGetDataStreamOptionsAction());
+            handlers.add(new RestPutDataStreamOptionsAction());
+            handlers.add(new RestDeleteDataStreamOptionsAction());
+        }
         return handlers;
     }
 

+ 108 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/DeleteDataStreamOptionsAction.java

@@ -0,0 +1,108 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.datastreams.options.action;
+
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.TimeValue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Removes the data stream options configuration from the requested data streams.
+ */
+public class DeleteDataStreamOptionsAction {
+
+    public static final ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>("indices:admin/data_stream/options/delete");
+
+    private DeleteDataStreamOptionsAction() {/* no instances */}
+
+    public static final class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
+
+        private String[] names;
+        private IndicesOptions indicesOptions = IndicesOptions.builder()
+            .concreteTargetOptions(IndicesOptions.ConcreteTargetOptions.ERROR_WHEN_UNAVAILABLE_TARGETS)
+            .wildcardOptions(
+                IndicesOptions.WildcardOptions.builder().matchOpen(true).matchClosed(true).allowEmptyExpressions(true).resolveAliases(false)
+            )
+            .gatekeeperOptions(IndicesOptions.GatekeeperOptions.builder().allowAliasToMultipleIndices(false).allowClosedIndices(true))
+            .build();
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            this.names = in.readOptionalStringArray();
+            this.indicesOptions = IndicesOptions.readIndicesOptions(in);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeOptionalStringArray(names);
+            indicesOptions.writeIndicesOptions(out);
+        }
+
+        public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String[] names) {
+            super(masterNodeTimeout, ackTimeout);
+            this.names = names;
+        }
+
+        public String[] getNames() {
+            return names;
+        }
+
+        @Override
+        public String[] indices() {
+            return names;
+        }
+
+        @Override
+        public IndicesOptions indicesOptions() {
+            return indicesOptions;
+        }
+
+        public Request indicesOptions(IndicesOptions indicesOptions) {
+            this.indicesOptions = indicesOptions;
+            return this;
+        }
+
+        @Override
+        public boolean includeDataStreams() {
+            return true;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Request request = (Request) o;
+            return Arrays.equals(names, request.names) && Objects.equals(indicesOptions, request.indicesOptions);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(indicesOptions);
+            result = 31 * result + Arrays.hashCode(names);
+            return result;
+        }
+
+        @Override
+        public IndicesRequest indices(String... indices) {
+            this.names = indices;
+            return this;
+        }
+    }
+}

+ 223 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/GetDataStreamOptionsAction.java

@@ -0,0 +1,223 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+package org.elasticsearch.datastreams.options.action;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.MasterNodeReadRequest;
+import org.elasticsearch.cluster.metadata.DataStreamOptions;
+import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.ToXContentObject;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This action retrieves the data stream options from every data stream. Currently, data stream options only support
+ * failure store.
+ */
+public class GetDataStreamOptionsAction {
+
+    public static final ActionType<Response> INSTANCE = new ActionType<>("indices:admin/data_stream/options/get");
+
+    private GetDataStreamOptionsAction() {/* no instances */}
+
+    public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
+
+        private String[] names;
+        private IndicesOptions indicesOptions = IndicesOptions.builder()
+            .concreteTargetOptions(IndicesOptions.ConcreteTargetOptions.ERROR_WHEN_UNAVAILABLE_TARGETS)
+            .wildcardOptions(
+                IndicesOptions.WildcardOptions.builder().matchOpen(true).matchClosed(true).allowEmptyExpressions(true).resolveAliases(false)
+            )
+            .gatekeeperOptions(IndicesOptions.GatekeeperOptions.builder().allowAliasToMultipleIndices(false).allowClosedIndices(true))
+            .build();
+        private boolean includeDefaults = false;
+
+        public Request(TimeValue masterNodeTimeout, String[] names) {
+            super(masterNodeTimeout);
+            this.names = names;
+        }
+
+        public Request(TimeValue masterNodeTimeout, String[] names, boolean includeDefaults) {
+            super(masterNodeTimeout);
+            this.names = names;
+            this.includeDefaults = includeDefaults;
+        }
+
+        public String[] getNames() {
+            return names;
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            this.names = in.readOptionalStringArray();
+            this.indicesOptions = IndicesOptions.readIndicesOptions(in);
+            this.includeDefaults = in.readBoolean();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeOptionalStringArray(names);
+            indicesOptions.writeIndicesOptions(out);
+            out.writeBoolean(includeDefaults);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Request request = (Request) o;
+            return Arrays.equals(names, request.names)
+                && indicesOptions.equals(request.indicesOptions)
+                && includeDefaults == request.includeDefaults;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(indicesOptions, includeDefaults);
+            result = 31 * result + Arrays.hashCode(names);
+            return result;
+        }
+
+        @Override
+        public String[] indices() {
+            return names;
+        }
+
+        @Override
+        public IndicesOptions indicesOptions() {
+            return indicesOptions;
+        }
+
+        public boolean includeDefaults() {
+            return includeDefaults;
+        }
+
+        public Request indicesOptions(IndicesOptions indicesOptions) {
+            this.indicesOptions = indicesOptions;
+            return this;
+        }
+
+        @Override
+        public boolean includeDataStreams() {
+            return true;
+        }
+
+        @Override
+        public IndicesRequest indices(String... indices) {
+            this.names = indices;
+            return this;
+        }
+
+        public Request includeDefaults(boolean includeDefaults) {
+            this.includeDefaults = includeDefaults;
+            return this;
+        }
+    }
+
+    public static class Response extends ActionResponse implements ChunkedToXContentObject {
+        public static final ParseField DATA_STREAMS_FIELD = new ParseField("data_streams");
+
+        public record DataStreamEntry(String dataStreamName, DataStreamOptions dataStreamOptions) implements Writeable, ToXContentObject {
+
+            public static final ParseField NAME_FIELD = new ParseField("name");
+            public static final ParseField OPTIONS_FIELD = new ParseField("options");
+
+            DataStreamEntry(StreamInput in) throws IOException {
+                this(in.readString(), in.readOptionalWriteable(DataStreamOptions::read));
+            }
+
+            @Override
+            public void writeTo(StreamOutput out) throws IOException {
+                out.writeString(dataStreamName);
+                out.writeOptionalWriteable(dataStreamOptions);
+            }
+
+            @Override
+            public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+                builder.startObject();
+                builder.field(NAME_FIELD.getPreferredName(), dataStreamName);
+                if (dataStreamOptions != null && dataStreamOptions.isEmpty() == false) {
+                    builder.field(OPTIONS_FIELD.getPreferredName(), dataStreamOptions);
+                }
+                builder.endObject();
+                return builder;
+            }
+        }
+
+        private final List<DataStreamEntry> dataStreams;
+
+        public Response(List<DataStreamEntry> dataStreams) {
+            this.dataStreams = dataStreams;
+        }
+
+        public Response(StreamInput in) throws IOException {
+            this(in.readCollectionAsList(DataStreamEntry::new));
+        }
+
+        public List<DataStreamEntry> getDataStreams() {
+            return dataStreams;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeCollection(dataStreams);
+        }
+
+        @Override
+        public Iterator<ToXContent> toXContentChunked(ToXContent.Params outerParams) {
+            return Iterators.concat(Iterators.single((builder, params) -> {
+                builder.startObject();
+                builder.startArray(DATA_STREAMS_FIELD.getPreferredName());
+                return builder;
+            }),
+                Iterators.map(dataStreams.iterator(), entry -> (builder, params) -> entry.toXContent(builder, outerParams)),
+                Iterators.single((builder, params) -> {
+                    builder.endArray();
+                    builder.endObject();
+                    return builder;
+                })
+            );
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Response response = (Response) o;
+            return dataStreams.equals(response.dataStreams);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dataStreams);
+        }
+    }
+}

+ 165 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/PutDataStreamOptionsAction.java

@@ -0,0 +1,165 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.datastreams.options.action;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
+import org.elasticsearch.cluster.metadata.DataStreamOptions;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xcontent.ConstructingObjectParser;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+import static org.elasticsearch.action.ValidateActions.addValidationError;
+
+/**
+ * Sets the data stream options that was provided in the request to the requested data streams.
+ */
+public class PutDataStreamOptionsAction {
+
+    public static final ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>("indices:admin/data_stream/options/put");
+
+    private PutDataStreamOptionsAction() {/* no instances */}
+
+    public static final class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
+
+        public interface Factory {
+            Request create(@Nullable DataStreamFailureStore dataStreamFailureStore);
+        }
+
+        public static final ConstructingObjectParser<Request, Factory> PARSER = new ConstructingObjectParser<>(
+            "put_data_stream_options_request",
+            false,
+            (args, factory) -> factory.create((DataStreamFailureStore) args[0])
+        );
+
+        static {
+            PARSER.declareObjectOrNull(
+                ConstructingObjectParser.optionalConstructorArg(),
+                (p, c) -> DataStreamFailureStore.PARSER.parse(p, null),
+                null,
+                new ParseField("failure_store")
+            );
+        }
+
+        public static Request parseRequest(XContentParser parser, Factory factory) {
+            return PARSER.apply(parser, factory);
+        }
+
+        private String[] names;
+        private IndicesOptions indicesOptions = IndicesOptions.builder()
+            .concreteTargetOptions(IndicesOptions.ConcreteTargetOptions.ERROR_WHEN_UNAVAILABLE_TARGETS)
+            .wildcardOptions(
+                IndicesOptions.WildcardOptions.builder().matchOpen(true).matchClosed(true).allowEmptyExpressions(true).resolveAliases(false)
+            )
+            .gatekeeperOptions(IndicesOptions.GatekeeperOptions.builder().allowAliasToMultipleIndices(false).allowClosedIndices(true))
+            .build();
+        private final DataStreamOptions options;
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            this.names = in.readStringArray();
+            this.indicesOptions = IndicesOptions.readIndicesOptions(in);
+            options = DataStreamOptions.read(in);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeStringArray(names);
+            indicesOptions.writeIndicesOptions(out);
+            out.writeWriteable(options);
+        }
+
+        public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String[] names, DataStreamOptions options) {
+            super(masterNodeTimeout, ackTimeout);
+            this.names = names;
+            this.options = options;
+        }
+
+        public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String[] names, @Nullable DataStreamFailureStore failureStore) {
+            super(masterNodeTimeout, ackTimeout);
+            this.names = names;
+            this.options = new DataStreamOptions(failureStore);
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            ActionRequestValidationException validationException = null;
+            if (options.failureStore() == null) {
+                validationException = addValidationError("At least one option needs to be provided", validationException);
+            }
+            return validationException;
+        }
+
+        public String[] getNames() {
+            return names;
+        }
+
+        public DataStreamOptions getOptions() {
+            return options;
+        }
+
+        @Override
+        public String[] indices() {
+            return names;
+        }
+
+        @Override
+        public IndicesOptions indicesOptions() {
+            return indicesOptions;
+        }
+
+        public Request indicesOptions(IndicesOptions indicesOptions) {
+            this.indicesOptions = indicesOptions;
+            return this;
+        }
+
+        @Override
+        public boolean includeDataStreams() {
+            return true;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Request request = (Request) o;
+            return Arrays.equals(names, request.names)
+                && Objects.equals(indicesOptions, request.indicesOptions)
+                && options.equals(request.options);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(indicesOptions, options);
+            result = 31 * result + Arrays.hashCode(names);
+            return result;
+        }
+
+        @Override
+        public IndicesRequest indices(String... names) {
+            this.names = names;
+            return this;
+        }
+    }
+}

+ 86 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportDeleteDataStreamOptionsAction.java

@@ -0,0 +1,86 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+package org.elasticsearch.datastreams.options.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.indices.SystemIndices;
+import org.elasticsearch.injection.guice.Inject;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.List;
+
+/**
+ * Transport action that resolves the data stream names from the request and removes any configured data stream options from them.
+ */
+public class TransportDeleteDataStreamOptionsAction extends AcknowledgedTransportMasterNodeAction<DeleteDataStreamOptionsAction.Request> {
+
+    private final MetadataDataStreamsService metadataDataStreamsService;
+    private final SystemIndices systemIndices;
+
+    @Inject
+    public TransportDeleteDataStreamOptionsAction(
+        TransportService transportService,
+        ClusterService clusterService,
+        ThreadPool threadPool,
+        ActionFilters actionFilters,
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        MetadataDataStreamsService metadataDataStreamsService,
+        SystemIndices systemIndices
+    ) {
+        super(
+            DeleteDataStreamOptionsAction.INSTANCE.name(),
+            transportService,
+            clusterService,
+            threadPool,
+            actionFilters,
+            DeleteDataStreamOptionsAction.Request::new,
+            indexNameExpressionResolver,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
+        );
+        this.metadataDataStreamsService = metadataDataStreamsService;
+        this.systemIndices = systemIndices;
+    }
+
+    @Override
+    protected void masterOperation(
+        Task task,
+        DeleteDataStreamOptionsAction.Request request,
+        ClusterState state,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        List<String> dataStreamNames = DataStreamsActionUtil.getDataStreamNames(
+            indexNameExpressionResolver,
+            state,
+            request.getNames(),
+            request.indicesOptions()
+        );
+        for (String name : dataStreamNames) {
+            systemIndices.validateDataStreamAccess(name, threadPool.getThreadContext());
+        }
+        metadataDataStreamsService.removeDataStreamOptions(dataStreamNames, request.ackTimeout(), request.masterNodeTimeout(), listener);
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(DeleteDataStreamOptionsAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
+    }
+}

+ 104 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportGetDataStreamOptionsAction.java

@@ -0,0 +1,104 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+package org.elasticsearch.datastreams.options.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.indices.SystemIndices;
+import org.elasticsearch.injection.guice.Inject;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Collects the data streams from the cluster state and then returns for each data stream its name and its
+ * data stream options. Currently, data stream options include only the failure store configuration.
+ */
+public class TransportGetDataStreamOptionsAction extends TransportMasterNodeReadAction<
+    GetDataStreamOptionsAction.Request,
+    GetDataStreamOptionsAction.Response> {
+
+    private final SystemIndices systemIndices;
+
+    @Inject
+    public TransportGetDataStreamOptionsAction(
+        TransportService transportService,
+        ClusterService clusterService,
+        ThreadPool threadPool,
+        ActionFilters actionFilters,
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        SystemIndices systemIndices
+    ) {
+        super(
+            GetDataStreamOptionsAction.INSTANCE.name(),
+            transportService,
+            clusterService,
+            threadPool,
+            actionFilters,
+            GetDataStreamOptionsAction.Request::new,
+            indexNameExpressionResolver,
+            GetDataStreamOptionsAction.Response::new,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
+        );
+        this.systemIndices = systemIndices;
+    }
+
+    @Override
+    protected void masterOperation(
+        Task task,
+        GetDataStreamOptionsAction.Request request,
+        ClusterState state,
+        ActionListener<GetDataStreamOptionsAction.Response> listener
+    ) {
+        List<String> requestedDataStreams = DataStreamsActionUtil.getDataStreamNames(
+            indexNameExpressionResolver,
+            state,
+            request.getNames(),
+            request.indicesOptions()
+        );
+        Map<String, DataStream> dataStreams = state.metadata().dataStreams();
+        for (String name : requestedDataStreams) {
+            systemIndices.validateDataStreamAccess(name, threadPool.getThreadContext());
+        }
+        listener.onResponse(
+            new GetDataStreamOptionsAction.Response(
+                requestedDataStreams.stream()
+                    .map(dataStreams::get)
+                    .filter(Objects::nonNull)
+                    .map(
+                        dataStream -> new GetDataStreamOptionsAction.Response.DataStreamEntry(
+                            dataStream.getName(),
+                            dataStream.getDataStreamOptions()
+                        )
+                    )
+                    .sorted(Comparator.comparing(GetDataStreamOptionsAction.Response.DataStreamEntry::dataStreamName))
+                    .toList()
+            )
+        );
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(GetDataStreamOptionsAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
+    }
+}

+ 92 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportPutDataStreamOptionsAction.java

@@ -0,0 +1,92 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+package org.elasticsearch.datastreams.options.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.indices.SystemIndices;
+import org.elasticsearch.injection.guice.Inject;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.List;
+
+/**
+ * Transport action that resolves the data stream names from the request and sets the data stream lifecycle provided in the request.
+ */
+public class TransportPutDataStreamOptionsAction extends AcknowledgedTransportMasterNodeAction<PutDataStreamOptionsAction.Request> {
+
+    private final MetadataDataStreamsService metadataDataStreamsService;
+    private final SystemIndices systemIndices;
+
+    @Inject
+    public TransportPutDataStreamOptionsAction(
+        TransportService transportService,
+        ClusterService clusterService,
+        ThreadPool threadPool,
+        ActionFilters actionFilters,
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        MetadataDataStreamsService metadataDataStreamsService,
+        SystemIndices systemIndices
+    ) {
+        super(
+            PutDataStreamOptionsAction.INSTANCE.name(),
+            transportService,
+            clusterService,
+            threadPool,
+            actionFilters,
+            PutDataStreamOptionsAction.Request::new,
+            indexNameExpressionResolver,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
+        );
+        this.metadataDataStreamsService = metadataDataStreamsService;
+        this.systemIndices = systemIndices;
+    }
+
+    @Override
+    protected void masterOperation(
+        Task task,
+        PutDataStreamOptionsAction.Request request,
+        ClusterState state,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        List<String> dataStreamNames = DataStreamsActionUtil.getDataStreamNames(
+            indexNameExpressionResolver,
+            state,
+            request.getNames(),
+            request.indicesOptions()
+        );
+        for (String name : dataStreamNames) {
+            systemIndices.validateDataStreamAccess(name, threadPool.getThreadContext());
+        }
+        metadataDataStreamsService.setDataStreamOptions(
+            dataStreamNames,
+            request.getOptions(),
+            request.ackTimeout(),
+            request.masterNodeTimeout(),
+            listener
+        );
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(PutDataStreamOptionsAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
+    }
+}

+ 54 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestDeleteDataStreamOptionsAction.java

@@ -0,0 +1,54 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+package org.elasticsearch.datastreams.options.rest;
+
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.datastreams.options.action.DeleteDataStreamOptionsAction;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestToXContentListener;
+
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.DELETE;
+import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;
+
+@ServerlessScope(Scope.INTERNAL)
+public class RestDeleteDataStreamOptionsAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "delete_data_stream_options_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(DELETE, "/_data_stream/{name}/_options"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
+        final var deleteDataOptionsRequest = new DeleteDataStreamOptionsAction.Request(
+            getMasterNodeTimeout(request),
+            request.paramAsTime("timeout", AcknowledgedRequest.DEFAULT_ACK_TIMEOUT),
+            Strings.splitStringByCommaToArray(request.param("name"))
+        );
+        deleteDataOptionsRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteDataOptionsRequest.indicesOptions()));
+        return channel -> client.execute(
+            DeleteDataStreamOptionsAction.INSTANCE,
+            deleteDataOptionsRequest,
+            new RestToXContentListener<>(channel)
+        );
+    }
+}

+ 58 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestGetDataStreamOptionsAction.java

@@ -0,0 +1,58 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+package org.elasticsearch.datastreams.options.rest;
+
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.datastreams.options.action.GetDataStreamOptionsAction;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestUtils;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
+
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+
+@ServerlessScope(Scope.PUBLIC)
+public class RestGetDataStreamOptionsAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "get_data_stream_options_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(GET, "/_data_stream/{name}/_options"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
+        GetDataStreamOptionsAction.Request getDataStreamOptionsRequest = new GetDataStreamOptionsAction.Request(
+            RestUtils.getMasterNodeTimeout(request),
+            Strings.splitStringByCommaToArray(request.param("name"))
+        );
+        getDataStreamOptionsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
+        getDataStreamOptionsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamOptionsRequest.indicesOptions()));
+        return channel -> client.execute(
+            GetDataStreamOptionsAction.INSTANCE,
+            getDataStreamOptionsRequest,
+            new RestRefCountedChunkedToXContentListener<>(channel)
+        );
+    }
+
+    @Override
+    public boolean allowSystemIndexAccessByDefault() {
+        return true;
+    }
+}

+ 58 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestPutDataStreamOptionsAction.java

@@ -0,0 +1,58 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+package org.elasticsearch.datastreams.options.rest;
+
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.datastreams.options.action.PutDataStreamOptionsAction;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.PUT;
+import static org.elasticsearch.rest.RestUtils.getAckTimeout;
+import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;
+
+@ServerlessScope(Scope.PUBLIC)
+public class RestPutDataStreamOptionsAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "put_data_stream_options_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(PUT, "/_data_stream/{name}/_options"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
+        try (XContentParser parser = request.contentParser()) {
+            PutDataStreamOptionsAction.Request putOptionsRequest = PutDataStreamOptionsAction.Request.parseRequest(
+                parser,
+                (failureStore) -> new PutDataStreamOptionsAction.Request(
+                    getMasterNodeTimeout(request),
+                    getAckTimeout(request),
+                    Strings.splitStringByCommaToArray(request.param("name")),
+                    failureStore
+                )
+            );
+            putOptionsRequest.indicesOptions(IndicesOptions.fromRequest(request, putOptionsRequest.indicesOptions()));
+            return channel -> client.execute(PutDataStreamOptionsAction.INSTANCE, putOptionsRequest, new RestToXContentListener<>(channel));
+        }
+    }
+}

+ 94 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

@@ -45,6 +45,7 @@ public class MetadataDataStreamsService {
     private final DataStreamGlobalRetentionSettings globalRetentionSettings;
     private final MasterServiceTaskQueue<UpdateLifecycleTask> updateLifecycleTaskQueue;
     private final MasterServiceTaskQueue<SetRolloverOnWriteTask> setRolloverOnWriteTaskQueue;
+    private final MasterServiceTaskQueue<UpdateOptionsTask> updateOptionsTaskQueue;
 
     public MetadataDataStreamsService(
         ClusterService clusterService,
@@ -93,6 +94,20 @@ public class MetadataDataStreamsService {
             Priority.NORMAL,
             rolloverOnWriteExecutor
         );
+        ClusterStateTaskExecutor<UpdateOptionsTask> updateOptionsExecutor = new SimpleBatchedAckListenerTaskExecutor<>() {
+
+            @Override
+            public Tuple<ClusterState, ClusterStateAckListener> executeTask(
+                UpdateOptionsTask modifyOptionsTask,
+                ClusterState clusterState
+            ) {
+                return new Tuple<>(
+                    updateDataStreamOptions(clusterState, modifyOptionsTask.getDataStreamNames(), modifyOptionsTask.getOptions()),
+                    modifyOptionsTask
+                );
+            }
+        };
+        this.updateOptionsTaskQueue = clusterService.createTaskQueue("modify-data-stream-options", Priority.NORMAL, updateOptionsExecutor);
     }
 
     public void modifyDataStream(final ModifyDataStreamsAction.Request request, final ActionListener<AcknowledgedResponse> listener) {
@@ -147,6 +162,39 @@ public class MetadataDataStreamsService {
         );
     }
 
+    /**
+     * Submits the task to set the provided data stream options to the requested data streams.
+     */
+    public void setDataStreamOptions(
+        final List<String> dataStreamNames,
+        DataStreamOptions options,
+        TimeValue ackTimeout,
+        TimeValue masterTimeout,
+        final ActionListener<AcknowledgedResponse> listener
+    ) {
+        updateOptionsTaskQueue.submitTask(
+            "set-data-stream-options",
+            new UpdateOptionsTask(dataStreamNames, options, ackTimeout, listener),
+            masterTimeout
+        );
+    }
+
+    /**
+     * Submits the task to remove the data stream options from the requested data streams.
+     */
+    public void removeDataStreamOptions(
+        List<String> dataStreamNames,
+        TimeValue ackTimeout,
+        TimeValue masterTimeout,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        updateOptionsTaskQueue.submitTask(
+            "delete-data-stream-options",
+            new UpdateOptionsTask(dataStreamNames, null, ackTimeout, listener),
+            masterTimeout
+        );
+    }
+
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
     private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
         clusterService.submitUnbatchedStateUpdateTask(source, task);
@@ -228,6 +276,24 @@ public class MetadataDataStreamsService {
         return ClusterState.builder(currentState).metadata(builder.build()).build();
     }
 
+    /**
+     * Creates an updated cluster state in which the requested data streams have the data stream options provided.
+     * Visible for testing.
+     */
+    ClusterState updateDataStreamOptions(
+        ClusterState currentState,
+        List<String> dataStreamNames,
+        @Nullable DataStreamOptions dataStreamOptions
+    ) {
+        Metadata metadata = currentState.metadata();
+        Metadata.Builder builder = Metadata.builder(metadata);
+        for (var dataStreamName : dataStreamNames) {
+            var dataStream = validateDataStream(metadata, dataStreamName);
+            builder.put(dataStream.copy().setDataStreamOptions(dataStreamOptions).build());
+        }
+        return ClusterState.builder(currentState).metadata(builder.build()).build();
+    }
+
     /**
      * Creates an updated cluster state in which the requested data stream has the flag {@link DataStream#rolloverOnWrite()}
      * set to the value of the parameter rolloverOnWrite
@@ -372,6 +438,34 @@ public class MetadataDataStreamsService {
         }
     }
 
+    /**
+     * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion.
+     */
+    static class UpdateOptionsTask extends AckedBatchedClusterStateUpdateTask {
+
+        private final List<String> dataStreamNames;
+        private final DataStreamOptions options;
+
+        UpdateOptionsTask(
+            List<String> dataStreamNames,
+            @Nullable DataStreamOptions options,
+            TimeValue ackTimeout,
+            ActionListener<AcknowledgedResponse> listener
+        ) {
+            super(ackTimeout, listener);
+            this.dataStreamNames = dataStreamNames;
+            this.options = options;
+        }
+
+        public List<String> getDataStreamNames() {
+            return dataStreamNames;
+        }
+
+        public DataStreamOptions getOptions() {
+            return options;
+        }
+    }
+
     /**
      * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion.
      */

+ 10 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamOptionsTests.java

@@ -24,7 +24,16 @@ public class DataStreamOptionsTests extends AbstractXContentSerializingTestCase<
 
     @Override
     protected DataStreamOptions createTestInstance() {
-        return new DataStreamOptions(randomBoolean() ? null : DataStreamFailureStoreTests.randomFailureStore());
+        return randomDataStreamOptions();
+    }
+
+    public static DataStreamOptions randomDataStreamOptions() {
+        return switch (randomIntBetween(0, 2)) {
+            case 0 -> DataStreamOptions.EMPTY;
+            case 1 -> DataStreamOptions.FAILURE_STORE_DISABLED;
+            case 2 -> DataStreamOptions.FAILURE_STORE_ENABLED;
+            default -> throw new IllegalArgumentException("Illegal randomisation branch");
+        };
     }
 
     @Override

+ 33 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java

@@ -422,6 +422,39 @@ public class MetadataDataStreamsServiceTests extends MapperServiceTestCase {
         }
     }
 
+    public void testUpdateDataStreamOptions() {
+        String dataStream = randomAlphaOfLength(5);
+        // we want the data stream options to be non-empty, so we can see the removal in action
+        DataStreamOptions dataStreamOptions = randomValueOtherThan(
+            DataStreamOptions.EMPTY,
+            DataStreamOptionsTests::randomDataStreamOptions
+        );
+        ClusterState before = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>(dataStream, 2)), List.of());
+        MetadataDataStreamsService service = new MetadataDataStreamsService(
+            mock(ClusterService.class),
+            mock(IndicesService.class),
+            DataStreamGlobalRetentionSettings.create(ClusterSettings.createBuiltInClusterSettings())
+        );
+
+        // Ensure no data stream options are stored
+        DataStream updatedDataStream = before.metadata().dataStreams().get(dataStream);
+        assertNotNull(updatedDataStream);
+        assertThat(updatedDataStream.getDataStreamOptions(), equalTo(DataStreamOptions.EMPTY));
+
+        // Set non-empty data stream options
+        ClusterState after = service.updateDataStreamOptions(before, List.of(dataStream), dataStreamOptions);
+        updatedDataStream = after.metadata().dataStreams().get(dataStream);
+        assertNotNull(updatedDataStream);
+        assertThat(updatedDataStream.getDataStreamOptions(), equalTo(dataStreamOptions));
+        before = after;
+
+        // Remove data stream options
+        after = service.updateDataStreamOptions(before, List.of(dataStream), null);
+        updatedDataStream = after.metadata().dataStreams().get(dataStream);
+        assertNotNull(updatedDataStream);
+        assertThat(updatedDataStream.getDataStreamOptions(), equalTo(DataStreamOptions.EMPTY));
+    }
+
     private MapperService getMapperService(IndexMetadata im) {
         try {
             String mapping = im.mapping().source().toString();

+ 3 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -501,6 +501,9 @@ public class Constants {
         "indices:admin/data_stream/lifecycle/get",
         "indices:admin/data_stream/lifecycle/put",
         "indices:admin/data_stream/lifecycle/explain",
+        "indices:admin/data_stream/options/delete",
+        "indices:admin/data_stream/options/get",
+        "indices:admin/data_stream/options/put",
         "indices:admin/delete",
         "indices:admin/flush",
         "indices:admin/flush[s]",