Ver Fonte

Run `TransportGetDataStreamOptionsAction` on local node (#125213)

This action solely needs the cluster state, it can run on any node.
Additionally, it needs to be cancellable to avoid doing unnecessary work
after a client failure or timeout.

Relates #101805
Niels Bauman há 7 meses atrás
pai
commit
f7d7ce7ccc

+ 5 - 0
docs/changelog/125213.yaml

@@ -0,0 +1,5 @@
+pr: 125213
+summary: Run `TransportGetDataStreamOptionsAction` on local node
+area: Data streams
+type: enhancement
+issues: []

+ 8 - 0
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java

@@ -22,6 +22,7 @@ import org.elasticsearch.client.Response;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.datastreams.options.action.GetDataStreamOptionsAction;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.root.MainRestPlugin;
 import org.elasticsearch.test.ESIntegTestCase;
@@ -72,6 +73,13 @@ public class DataStreamRestActionCancellationIT extends ESIntegTestCase {
         );
     }
 
+    public void testGetDataStreamOptionsCancellation() {
+        runRestActionCancellationTest(
+            new Request(HttpGet.METHOD_NAME, "/_data_stream/test/_options"),
+            GetDataStreamOptionsAction.INSTANCE.name()
+        );
+    }
+
     private void runRestActionCancellationTest(Request request, String actionName) {
         final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
 

+ 27 - 18
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/GetDataStreamOptionsAction.java

@@ -13,7 +13,7 @@ import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.master.MasterNodeReadRequest;
+import org.elasticsearch.action.support.local.LocalClusterStateRequest;
 import org.elasticsearch.cluster.metadata.DataStreamOptions;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -21,6 +21,10 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.UpdateForV10;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.ToXContentObject;
@@ -30,6 +34,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -42,7 +47,7 @@ public class GetDataStreamOptionsAction {
 
     private GetDataStreamOptionsAction() {/* no instances */}
 
-    public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
+    public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
 
         private String[] names;
         private IndicesOptions indicesOptions = IndicesOptions.builder()
@@ -76,6 +81,16 @@ public class GetDataStreamOptionsAction {
             return null;
         }
 
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, "", parentTaskId, headers);
+        }
+
+        /**
+         * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
+         * we no longer need to support calling this action remotely.
+         */
+        @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
         public Request(StreamInput in) throws IOException {
             super(in);
             this.names = in.readOptionalStringArray();
@@ -83,14 +98,6 @@ public class GetDataStreamOptionsAction {
             this.includeDefaults = in.readBoolean();
         }
 
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeOptionalStringArray(names);
-            indicesOptions.writeIndicesOptions(out);
-            out.writeBoolean(includeDefaults);
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
@@ -152,10 +159,11 @@ public class GetDataStreamOptionsAction {
             public static final ParseField NAME_FIELD = new ParseField("name");
             public static final ParseField OPTIONS_FIELD = new ParseField("options");
 
-            DataStreamEntry(StreamInput in) throws IOException {
-                this(in.readString(), in.readOptionalWriteable(DataStreamOptions::read));
-            }
-
+            /**
+             * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
+             * we no longer need to support calling this action remotely.
+             */
+            @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
             @Override
             public void writeTo(StreamOutput out) throws IOException {
                 out.writeString(dataStreamName);
@@ -180,14 +188,15 @@ public class GetDataStreamOptionsAction {
             this.dataStreams = dataStreams;
         }
 
-        public Response(StreamInput in) throws IOException {
-            this(in.readCollectionAsList(DataStreamEntry::new));
-        }
-
         public List<DataStreamEntry> getDataStreams() {
             return dataStreams;
         }
 
+        /**
+         * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
+         * we no longer need to support calling this action remotely.
+         */
+        @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeCollection(dataStreams);

+ 28 - 10
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportGetDataStreamOptionsAction.java

@@ -11,7 +11,8 @@ package org.elasticsearch.datastreams.options.action;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
+import org.elasticsearch.action.support.ChannelActionListener;
+import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
 import org.elasticsearch.cluster.ProjectState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -20,8 +21,10 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.core.UpdateForV10;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.injection.guice.Inject;
+import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -35,13 +38,20 @@ import java.util.Objects;
  * Collects the data streams from the cluster state and then returns for each data stream its name and its
  * data stream options. Currently, data stream options include only the failure store configuration.
  */
-public class TransportGetDataStreamOptionsAction extends TransportMasterNodeReadProjectAction<
+public class TransportGetDataStreamOptionsAction extends TransportLocalProjectMetadataAction<
     GetDataStreamOptionsAction.Request,
     GetDataStreamOptionsAction.Response> {
 
     private final IndexNameExpressionResolver indexNameExpressionResolver;
     private final SystemIndices systemIndices;
+    private final ThreadPool threadPool;
 
+    /**
+     * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
+     * we no longer need to support calling this action remotely.
+     */
+    @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
+    @SuppressWarnings("this-escape")
     @Inject
     public TransportGetDataStreamOptionsAction(
         TransportService transportService,
@@ -54,26 +64,34 @@ public class TransportGetDataStreamOptionsAction extends TransportMasterNodeRead
     ) {
         super(
             GetDataStreamOptionsAction.INSTANCE.name(),
-            transportService,
-            clusterService,
-            threadPool,
             actionFilters,
-            GetDataStreamOptionsAction.Request::new,
-            projectResolver,
-            GetDataStreamOptionsAction.Response::new,
-            EsExecutors.DIRECT_EXECUTOR_SERVICE
+            transportService.getTaskManager(),
+            clusterService,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE,
+            projectResolver
         );
         this.indexNameExpressionResolver = indexNameExpressionResolver;
         this.systemIndices = systemIndices;
+        this.threadPool = threadPool;
+
+        transportService.registerRequestHandler(
+            actionName,
+            executor,
+            false,
+            true,
+            GetDataStreamOptionsAction.Request::new,
+            (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
+        );
     }
 
     @Override
-    protected void masterOperation(
+    protected void localClusterStateOperation(
         Task task,
         GetDataStreamOptionsAction.Request request,
         ProjectState state,
         ActionListener<GetDataStreamOptionsAction.Response> listener
     ) {
+        ((CancellableTask) task).ensureNotCancelled();
         List<String> requestedDataStreams = DataStreamsActionUtil.getDataStreamNames(
             indexNameExpressionResolver,
             state.metadata(),

+ 2 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestGetDataStreamOptionsAction.java

@@ -17,6 +17,7 @@ import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
 
 import java.util.List;
@@ -44,7 +45,7 @@ public class RestGetDataStreamOptionsAction extends BaseRestHandler {
         );
         getDataStreamOptionsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
         getDataStreamOptionsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamOptionsRequest.indicesOptions()));
-        return channel -> client.execute(
+        return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
             GetDataStreamOptionsAction.INSTANCE,
             getDataStreamOptionsRequest,
             new RestRefCountedChunkedToXContentListener<>(channel)