Browse Source

Run `TransportGetDataStreamsAction` on local node (#122852)

This action solely needs the cluster state, it can run on any node.
Additionally, it needs to be cancellable to avoid doing unnecessary work
after a client failure or timeout.

Relates #101805
Niels Bauman 7 months ago
parent
commit
af6eb8cc38

+ 5 - 0
docs/changelog/122852.yaml

@@ -0,0 +1,5 @@
+pr: 122852
+summary: Run `TransportGetDataStreamsAction` on local node
+area: Data streams
+type: enhancement
+issues: []

+ 1 - 0
modules/data-streams/build.gradle

@@ -22,6 +22,7 @@ dependencies {
   testImplementation project(path: ':test:test-clusters')
   testImplementation project(":modules:mapper-extras")
   internalClusterTestImplementation project(":modules:mapper-extras")
+  internalClusterTestImplementation project(':modules:rest-root')
 }
 
 tasks.withType(StandaloneRestIntegTestTask).configureEach {

+ 143 - 0
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java

@@ -0,0 +1,143 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.datastreams;
+
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.elasticsearch.action.datastreams.GetDataStreamAction;
+import org.elasticsearch.action.support.CancellableActionTestPlugin;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.RefCountingListener;
+import org.elasticsearch.action.support.SubscribableListener;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.rest.root.MainRestPlugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.rest.ObjectPath;
+import org.elasticsearch.transport.netty4.Netty4Plugin;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
+import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.oneOf;
+
+public class DataStreamRestActionCancellationIT extends ESIntegTestCase {
+
+    @Override
+    protected boolean addMockHttpTransport() {
+        return false; // enable http
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal, otherSettings))
+            .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
+            .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
+            .build();
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return List.of(getTestTransportPlugin(), MainRestPlugin.class, CancellableActionTestPlugin.class, DataStreamsPlugin.class);
+    }
+
+    public void testGetDataStreamCancellation() {
+        runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream"), GetDataStreamAction.NAME);
+        runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream?verbose"), GetDataStreamAction.NAME);
+    }
+
+    private void runRestActionCancellationTest(Request request, String actionName) {
+        final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
+
+        try (
+            var restClient = createRestClient(node);
+            var capturingAction = CancellableActionTestPlugin.capturingActionOnNode(actionName, node)
+        ) {
+            final var responseFuture = new PlainActionFuture<Response>();
+            final var restInvocation = restClient.performRequestAsync(request, wrapAsRestResponseListener(responseFuture));
+
+            if (randomBoolean()) {
+                // cancel by aborting the REST request
+                capturingAction.captureAndCancel(restInvocation::cancel);
+                expectThrows(ExecutionException.class, CancellationException.class, () -> responseFuture.get(10, TimeUnit.SECONDS));
+            } else {
+                // cancel via the task management API
+                final var cancelFuture = new PlainActionFuture<Void>();
+                capturingAction.captureAndCancel(
+                    () -> SubscribableListener
+
+                        .<ObjectPath>newForked(
+                            l -> restClient.performRequestAsync(
+                                getListTasksRequest(node, actionName),
+                                wrapAsRestResponseListener(l.map(ObjectPath::createFromResponse))
+                            )
+                        )
+
+                        .<Void>andThen((l, listTasksResponse) -> {
+                            final var taskCount = listTasksResponse.evaluateArraySize("tasks");
+                            assertThat(taskCount, greaterThan(0));
+                            try (var listeners = new RefCountingListener(l)) {
+                                for (int i = 0; i < taskCount; i++) {
+                                    final var taskPrefix = "tasks." + i + ".";
+                                    assertTrue(listTasksResponse.evaluate(taskPrefix + "cancellable"));
+                                    assertFalse(listTasksResponse.evaluate(taskPrefix + "cancelled"));
+                                    restClient.performRequestAsync(
+                                        getCancelTaskRequest(
+                                            listTasksResponse.evaluate(taskPrefix + "node"),
+                                            listTasksResponse.evaluate(taskPrefix + "id")
+                                        ),
+                                        wrapAsRestResponseListener(listeners.acquire(DataStreamRestActionCancellationIT::assertOK))
+                                    );
+                                }
+                            }
+                        })
+
+                        .addListener(cancelFuture)
+                );
+                cancelFuture.get(10, TimeUnit.SECONDS);
+                expectThrows(Exception.class, () -> responseFuture.get(10, TimeUnit.SECONDS));
+            }
+
+            assertAllTasksHaveFinished(actionName);
+        } catch (Exception e) {
+            fail(e);
+        }
+    }
+
+    private static Request getListTasksRequest(String taskNode, String actionName) {
+        final var listTasksRequest = new Request(HttpGet.METHOD_NAME, "/_tasks");
+        listTasksRequest.addParameter("nodes", taskNode);
+        listTasksRequest.addParameter("actions", actionName);
+        listTasksRequest.addParameter("group_by", "none");
+        return listTasksRequest;
+    }
+
+    private static Request getCancelTaskRequest(String taskNode, int taskId) {
+        final var cancelTaskRequest = new Request(HttpPost.METHOD_NAME, Strings.format("/_tasks/%s:%d/_cancel", taskNode, taskId));
+        cancelTaskRequest.addParameter("wait_for_completion", null);
+        return cancelTaskRequest;
+    }
+
+    public static void assertOK(Response response) {
+        assertThat(response.getStatusLine().getStatusCode(), oneOf(200, 201));
+    }
+
+}

+ 27 - 10
modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java

@@ -17,7 +17,8 @@ import org.elasticsearch.action.datastreams.GetDataStreamAction;
 import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.IndexProperties;
 import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.ManagedBy;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
+import org.elasticsearch.action.support.ChannelActionListener;
+import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.client.internal.OriginSettingClient;
 import org.elasticsearch.cluster.ProjectState;
@@ -39,6 +40,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.core.UpdateForV10;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexSettingProvider;
@@ -47,6 +49,7 @@ import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.indices.SystemDataStreamDescriptor;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.injection.guice.Inject;
+import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -63,7 +66,7 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.index.IndexSettings.PREFER_ILM_SETTING;
 
-public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjectAction<
+public class TransportGetDataStreamsAction extends TransportLocalProjectMetadataAction<
     GetDataStreamAction.Request,
     GetDataStreamAction.Response> {
 
@@ -76,6 +79,12 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec
     private final IndexSettingProviders indexSettingProviders;
     private final Client client;
 
+    /**
+     * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
+     * we no longer need to support calling this action remotely.
+     */
+    @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
+    @SuppressWarnings("this-escape")
     @Inject
     public TransportGetDataStreamsAction(
         TransportService transportService,
@@ -92,14 +101,11 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec
     ) {
         super(
             GetDataStreamAction.NAME,
-            transportService,
-            clusterService,
-            threadPool,
             actionFilters,
-            GetDataStreamAction.Request::new,
-            projectResolver,
-            GetDataStreamAction.Response::new,
-            transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
+            transportService.getTaskManager(),
+            clusterService,
+            threadPool.executor(ThreadPool.Names.MANAGEMENT),
+            projectResolver
         );
         this.indexNameExpressionResolver = indexNameExpressionResolver;
         this.systemIndices = systemIndices;
@@ -108,21 +114,32 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec
         this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
         this.indexSettingProviders = indexSettingProviders;
         this.client = new OriginSettingClient(client, "stack");
+
+        transportService.registerRequestHandler(
+            actionName,
+            executor,
+            false,
+            true,
+            GetDataStreamAction.Request::new,
+            (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
+        );
     }
 
     @Override
-    protected void masterOperation(
+    protected void localClusterStateOperation(
         Task task,
         GetDataStreamAction.Request request,
         ProjectState state,
         ActionListener<GetDataStreamAction.Response> listener
     ) throws Exception {
+        ((CancellableTask) task).ensureNotCancelled();
         if (request.verbose()) {
             DataStreamsStatsAction.Request req = new DataStreamsStatsAction.Request();
             req.indices(request.indices());
             client.execute(DataStreamsStatsAction.INSTANCE, req, new ActionListener<>() {
                 @Override
                 public void onResponse(DataStreamsStatsAction.Response response) {
+                    ((CancellableTask) task).ensureNotCancelled();
                     final Map<String, Long> maxTimestamps = Arrays.stream(response.getDataStreams())
                         .collect(
                             Collectors.toMap(

+ 6 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java

@@ -19,6 +19,7 @@ import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestToXContentListener;
 
 import java.util.List;
@@ -64,7 +65,11 @@ public class RestGetDataStreamsAction extends BaseRestHandler {
         getDataStreamsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
         getDataStreamsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamsRequest.indicesOptions()));
         getDataStreamsRequest.verbose(request.paramAsBoolean("verbose", false));
-        return channel -> client.execute(GetDataStreamAction.INSTANCE, getDataStreamsRequest, new RestToXContentListener<>(channel));
+        return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
+            GetDataStreamAction.INSTANCE,
+            getDataStreamsRequest,
+            new RestToXContentListener<>(channel)
+        );
     }
 
     @Override

+ 0 - 73
modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsRequestTests.java

@@ -1,73 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the "Elastic License
- * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
- * Public License v 1"; you may not use this file except in compliance with, at
- * your election, the "Elastic License 2.0", the "GNU Affero General Public
- * License v3.0 only", or the "Server Side Public License, v 1".
- */
-package org.elasticsearch.datastreams.action;
-
-import org.elasticsearch.action.datastreams.GetDataStreamAction.Request;
-import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
-
-public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase<Request> {
-
-    @Override
-    protected Writeable.Reader<Request> instanceReader() {
-        return Request::new;
-    }
-
-    @Override
-    protected Request createTestInstance() {
-        var req = new Request(TEST_REQUEST_TIMEOUT, switch (randomIntBetween(1, 4)) {
-            case 1 -> generateRandomStringArray(3, 8, false, false);
-            case 2 -> {
-                String[] parameters = generateRandomStringArray(3, 8, false, false);
-                for (int k = 0; k < parameters.length; k++) {
-                    parameters[k] = parameters[k] + "*";
-                }
-                yield parameters;
-            }
-            case 3 -> new String[] { "*" };
-            default -> null;
-        });
-        req.verbose(randomBoolean());
-        return req;
-    }
-
-    @Override
-    protected Request mutateInstance(Request instance) {
-        var indices = instance.indices();
-        var indicesOpts = instance.indicesOptions();
-        var includeDefaults = instance.includeDefaults();
-        var verbose = instance.verbose();
-        switch (randomIntBetween(0, 3)) {
-            case 0 -> indices = randomValueOtherThan(indices, () -> generateRandomStringArray(3, 8, false, false));
-            case 1 -> indicesOpts = randomValueOtherThan(
-                indicesOpts,
-                () -> IndicesOptions.fromOptions(
-                    randomBoolean(),
-                    randomBoolean(),
-                    randomBoolean(),
-                    randomBoolean(),
-                    randomBoolean(),
-                    randomBoolean(),
-                    randomBoolean(),
-                    randomBoolean(),
-                    randomBoolean()
-                )
-            );
-            case 2 -> includeDefaults = includeDefaults == false;
-            case 3 -> verbose = verbose == false;
-        }
-        var newReq = new Request(instance.masterNodeTimeout(), indices);
-        newReq.includeDefaults(includeDefaults);
-        newReq.indicesOptions(indicesOpts);
-        newReq.verbose(verbose);
-        return newReq;
-    }
-
-}

+ 1 - 124
modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java

@@ -14,14 +14,10 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
 import org.elasticsearch.cluster.metadata.DataStreamOptions;
-import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexMode;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -29,9 +25,6 @@ import org.elasticsearch.xcontent.XContentFactory;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.json.JsonXContent;
 
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -40,30 +33,7 @@ import static org.elasticsearch.cluster.metadata.DataStream.getDefaultFailureSto
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 
-public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase<Response> {
-
-    @Override
-    protected Writeable.Reader<Response> instanceReader() {
-        return Response::new;
-    }
-
-    @Override
-    protected Response createTestInstance() {
-        int numDataStreams = randomIntBetween(0, 8);
-        List<Response.DataStreamInfo> dataStreams = new ArrayList<>();
-        for (int i = 0; i < numDataStreams; i++) {
-            dataStreams.add(generateRandomDataStreamInfo());
-        }
-        return new Response(dataStreams);
-    }
-
-    @Override
-    protected Response mutateInstance(Response instance) {
-        if (instance.getDataStreams().isEmpty()) {
-            return new Response(List.of(generateRandomDataStreamInfo()));
-        }
-        return new Response(instance.getDataStreams().stream().map(this::mutateInstance).toList());
-    }
+public class GetDataStreamsResponseTests extends ESTestCase {
 
     @SuppressWarnings("unchecked")
     public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Exception {
@@ -283,97 +253,4 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
         assertThat(ManagedBy.LIFECYCLE.displayValue, is("Data stream lifecycle"));
         assertThat(ManagedBy.UNMANAGED.displayValue, is("Unmanaged"));
     }
-
-    private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance) {
-        var dataStream = instance.getDataStream();
-        var failureStoreEffectivelyEnabled = instance.isFailureStoreEffectivelyEnabled();
-        var status = instance.getDataStreamStatus();
-        var indexTemplate = instance.getIndexTemplate();
-        var ilmPolicyName = instance.getIlmPolicy();
-        var timeSeries = instance.getTimeSeries();
-        var indexSettings = instance.getIndexSettingsValues();
-        var templatePreferIlm = instance.templatePreferIlmValue();
-        var maximumTimestamp = instance.getMaximumTimestamp();
-        switch (randomIntBetween(0, 8)) {
-            case 0 -> dataStream = randomValueOtherThan(dataStream, DataStreamTestHelper::randomInstance);
-            case 1 -> status = randomValueOtherThan(status, () -> randomFrom(ClusterHealthStatus.values()));
-            case 2 -> indexTemplate = randomBoolean() && indexTemplate != null ? null : randomAlphaOfLengthBetween(2, 10);
-            case 3 -> ilmPolicyName = randomBoolean() && ilmPolicyName != null ? null : randomAlphaOfLengthBetween(2, 10);
-            case 4 -> timeSeries = randomBoolean() && timeSeries != null
-                ? null
-                : randomValueOtherThan(timeSeries, () -> new Response.TimeSeries(generateRandomTimeSeries()));
-            case 5 -> indexSettings = randomValueOtherThan(
-                indexSettings,
-                () -> randomBoolean()
-                    ? Map.of()
-                    : Map.of(
-                        new Index(randomAlphaOfLengthBetween(50, 100), UUIDs.base64UUID()),
-                        new Response.IndexProperties(
-                            randomBoolean(),
-                            randomAlphaOfLengthBetween(50, 100),
-                            randomBoolean() ? ManagedBy.ILM : ManagedBy.LIFECYCLE,
-                            null
-                        )
-                    )
-            );
-            case 6 -> templatePreferIlm = templatePreferIlm ? false : true;
-            case 7 -> maximumTimestamp = (maximumTimestamp == null)
-                ? randomNonNegativeLong()
-                : (usually() ? randomValueOtherThan(maximumTimestamp, ESTestCase::randomNonNegativeLong) : null);
-            case 8 -> failureStoreEffectivelyEnabled = failureStoreEffectivelyEnabled ? false : true;
-        }
-        return new Response.DataStreamInfo(
-            dataStream,
-            failureStoreEffectivelyEnabled,
-            status,
-            indexTemplate,
-            ilmPolicyName,
-            timeSeries,
-            indexSettings,
-            templatePreferIlm,
-            maximumTimestamp,
-            null
-        );
-    }
-
-    private List<Tuple<Instant, Instant>> generateRandomTimeSeries() {
-        List<Tuple<Instant, Instant>> timeSeries = new ArrayList<>();
-        int numTimeSeries = randomIntBetween(0, 3);
-        for (int j = 0; j < numTimeSeries; j++) {
-            timeSeries.add(new Tuple<>(Instant.now(), Instant.now()));
-        }
-        return timeSeries;
-    }
-
-    private Map<Index, Response.IndexProperties> generateRandomIndexSettingsValues() {
-        Map<Index, Response.IndexProperties> values = new HashMap<>();
-        for (int i = 0; i < randomIntBetween(0, 3); i++) {
-            values.put(
-                new Index(randomAlphaOfLengthBetween(50, 100), UUIDs.base64UUID()),
-                new Response.IndexProperties(
-                    randomBoolean(),
-                    randomAlphaOfLengthBetween(50, 100),
-                    randomBoolean() ? ManagedBy.ILM : ManagedBy.LIFECYCLE,
-                    randomBoolean() ? randomFrom(IndexMode.values()).getName() : null
-                )
-            );
-        }
-        return values;
-    }
-
-    private Response.DataStreamInfo generateRandomDataStreamInfo() {
-        List<Tuple<Instant, Instant>> timeSeries = randomBoolean() ? generateRandomTimeSeries() : null;
-        return new Response.DataStreamInfo(
-            DataStreamTestHelper.randomInstance(),
-            randomBoolean(),
-            ClusterHealthStatus.GREEN,
-            randomAlphaOfLengthBetween(2, 10),
-            randomAlphaOfLengthBetween(2, 10),
-            timeSeries != null ? new Response.TimeSeries(timeSeries) : null,
-            generateRandomIndexSettingsValues(),
-            randomBoolean(),
-            usually() ? randomNonNegativeLong() : null,
-            usually() ? randomFrom(IndexMode.values()).getName() : null
-        );
-    }
 }

+ 31 - 52
server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java

@@ -15,7 +15,7 @@ import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
 import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.master.MasterNodeReadRequest;
+import org.elasticsearch.action.support.local.LocalClusterStateRequest;
 import org.elasticsearch.cluster.SimpleDiffable;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.DataStream;
@@ -28,8 +28,12 @@ import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.core.UpdateForV10;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -53,7 +57,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
         super(NAME);
     }
 
-    public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
+    public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
 
         private String[] names;
         private IndicesOptions indicesOptions = IndicesOptions.builder()
@@ -104,6 +108,16 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
             return null;
         }
 
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, "", parentTaskId, headers);
+        }
+
+        /**
+         * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
+         * we no longer need to support calling this action remotely.
+         */
+        @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
         public Request(StreamInput in) throws IOException {
             super(in);
             this.names = in.readOptionalStringArray();
@@ -120,19 +134,6 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
             }
         }
 
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeOptionalStringArray(names);
-            indicesOptions.writeIndicesOptions(out);
-            if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
-                out.writeBoolean(includeDefaults);
-            }
-            if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
-                out.writeBoolean(verbose);
-            }
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
@@ -274,29 +275,6 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                 this.indexMode = indexMode;
             }
 
-            @SuppressWarnings("unchecked")
-            DataStreamInfo(StreamInput in) throws IOException {
-                this.dataStream = DataStream.read(in);
-                this.failureStoreEffectivelyEnabled = in.getTransportVersion()
-                    .onOrAfter(TransportVersions.FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING)
-                        ? in.readBoolean()
-                        : dataStream.isFailureStoreExplicitlyEnabled(); // Revert to the behaviour before this field was added
-                this.dataStreamStatus = ClusterHealthStatus.readFrom(in);
-                this.indexTemplate = in.readOptionalString();
-                this.ilmPolicyName = in.readOptionalString();
-                this.timeSeries = in.getTransportVersion().onOrAfter(TransportVersions.V_8_3_0)
-                    ? in.readOptionalWriteable(TimeSeries::new)
-                    : null;
-                this.indexSettingsValues = in.getTransportVersion().onOrAfter(V_8_11_X)
-                    ? in.readMap(Index::new, IndexProperties::new)
-                    : Map.of();
-                this.templatePreferIlmValue = in.getTransportVersion().onOrAfter(V_8_11_X) ? in.readBoolean() : true;
-                this.maximumTimestamp = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readOptionalVLong() : null;
-                this.indexMode = in.getTransportVersion().onOrAfter(TransportVersions.INCLUDE_INDEX_MODE_IN_GET_DATA_STREAM)
-                    ? in.readOptionalString()
-                    : null;
-            }
-
             public DataStream getDataStream() {
                 return dataStream;
             }
@@ -342,6 +320,11 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                 return indexMode;
             }
 
+            /**
+             * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
+             * we no longer need to support calling this action remotely.
+             */
+            @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
             @Override
             public void writeTo(StreamOutput out) throws IOException {
                 dataStream.writeTo(out);
@@ -543,10 +526,11 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
 
         public record TimeSeries(List<Tuple<Instant, Instant>> temporalRanges) implements Writeable {
 
-            TimeSeries(StreamInput in) throws IOException {
-                this(in.readCollectionAsList(in1 -> new Tuple<>(in1.readInstant(), in1.readInstant())));
-            }
-
+            /**
+             * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
+             * we no longer need to support calling this action remotely.
+             */
+            @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
             @Override
             public void writeTo(StreamOutput out) throws IOException {
                 out.writeCollection(temporalRanges, (out1, value) -> {
@@ -618,16 +602,6 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
             this.globalRetention = globalRetention;
         }
 
-        public Response(StreamInput in) throws IOException {
-            this(
-                in.readCollectionAsList(DataStreamInfo::new),
-                in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X) ? in.readOptionalWriteable(RolloverConfiguration::new) : null,
-                in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)
-                    ? in.readOptionalWriteable(DataStreamGlobalRetention::read)
-                    : null
-            );
-        }
-
         public List<DataStreamInfo> getDataStreams() {
             return dataStreams;
         }
@@ -642,6 +616,11 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
             return globalRetention;
         }
 
+        /**
+         * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
+         * we no longer need to support calling this action remotely.
+         */
+        @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeCollection(dataStreams);