Browse Source

Run `TransportGetMappingsAction` on local node (#122921)

This action solely needs the cluster state, it can run on any node.
Additionally, it needs to be cancellable to avoid doing unnecessary work
after a client failure or timeout.

Relates #101805
Niels Bauman 7 months ago
parent
commit
481d91c428

+ 5 - 0
docs/changelog/122921.yaml

@@ -0,0 +1,5 @@
+pr: 122921
+summary: Run `TransportGetMappingsAction` on local node
+area: Indices APIs
+type: enhancement
+issues: []

+ 6 - 0
qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestActionCancellationIT.java

@@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthActio
 import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
 import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
 import org.elasticsearch.action.admin.indices.template.get.GetComponentTemplateAction;
 import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTemplateAction;
@@ -108,6 +109,11 @@ public class RestActionCancellationIT extends HttpSmokeTestCase {
         runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_ingest/pipeline"), GetPipelineAction.NAME);
     }
 
+    public void testGetMappingsCancellation() {
+        createIndex("test");
+        runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/test/_mappings"), GetMappingsAction.NAME);
+    }
+
     private void runRestActionCancellationTest(Request request, String actionName) {
         final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
 

+ 0 - 5
qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestClusterInfoActionCancellationIT.java

@@ -11,7 +11,6 @@ package org.elasticsearch.http;
 
 import org.apache.http.client.methods.HttpGet;
 import org.elasticsearch.action.admin.indices.get.GetIndexAction;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Cancellable;
@@ -41,10 +40,6 @@ import static org.hamcrest.core.IsEqual.equalTo;
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
 public class RestClusterInfoActionCancellationIT extends HttpSmokeTestCase {
 
-    public void testGetMappingsCancellation() throws Exception {
-        runTest(GetMappingsAction.NAME, "/test/_mappings");
-    }
-
     public void testGetIndicesCancellation() throws Exception {
         runTest(GetIndexAction.NAME, "/test");
     }

+ 2 - 5
rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_mapping.json

@@ -54,15 +54,12 @@
       },
       "master_timeout":{
         "type":"time",
-        "description":"Specify timeout for connection to master"
+        "description":"Timeout for waiting for new cluster state in case it is blocked"
       },
       "local":{
         "type":"boolean",
         "description":"Return local information, do not retrieve the state from master node (default: false)",
-        "deprecated":{
-          "version":"7.8.0",
-          "description":"This parameter is a no-op and field mappings are always retrieved locally."
-        }
+        "deprecated":true
       }
     }
   }

+ 0 - 12
server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java

@@ -24,8 +24,6 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
 import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
 import org.elasticsearch.action.admin.indices.open.OpenIndexAction;
@@ -516,16 +514,6 @@ public class IndicesRequestIT extends ESIntegTestCase {
         assertSameIndices(deleteIndexRequest, TransportDeleteIndexAction.TYPE.name());
     }
 
-    public void testGetMappings() {
-        interceptTransportActions(GetMappingsAction.NAME);
-
-        GetMappingsRequest getMappingsRequest = new GetMappingsRequest(TEST_REQUEST_TIMEOUT).indices(randomIndicesOrAliases());
-        internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet();
-
-        clearInterceptedActions();
-        assertSameIndices(getMappingsRequest, GetMappingsAction.NAME);
-    }
-
     public void testPutMapping() {
         interceptTransportActions(TransportPutMappingAction.TYPE.name());
 

+ 0 - 1
server/src/internalClusterTest/java/org/elasticsearch/cluster/SimpleClusterStateIT.java

@@ -266,7 +266,6 @@ public class SimpleClusterStateIT extends ESIntegTestCase {
             MappingMetadata mappingMetadata = client.admin()
                 .indices()
                 .prepareGetMappings(TEST_REQUEST_TIMEOUT, "test")
-                .setLocal(true)
                 .get()
                 .getMappings()
                 .get("test");

+ 47 - 3
server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsRequest.java

@@ -9,11 +9,15 @@
 
 package org.elasticsearch.action.admin.indices.mapping.get;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.master.info.ClusterInfoRequest;
+import org.elasticsearch.action.support.local.LocalClusterStateRequest;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.UpdateForV10;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
@@ -21,14 +25,28 @@ import org.elasticsearch.tasks.TaskId;
 import java.io.IOException;
 import java.util.Map;
 
-public class GetMappingsRequest extends ClusterInfoRequest<GetMappingsRequest> {
+public class GetMappingsRequest extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
+
+    private String[] indices = Strings.EMPTY_ARRAY;
+    private IndicesOptions indicesOptions;
 
     public GetMappingsRequest(TimeValue masterTimeout) {
-        super(masterTimeout, IndicesOptions.strictExpandOpen());
+        super(masterTimeout);
+        indicesOptions = IndicesOptions.strictExpandOpen();
     }
 
+    /**
+     * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
+     * we no longer need to support calling this action remotely.
+     */
+    @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
     public GetMappingsRequest(StreamInput in) throws IOException {
         super(in);
+        indices = in.readStringArray();
+        if (in.getTransportVersion().before(TransportVersions.V_8_0_0)) {
+            in.readStringArray();
+        }
+        indicesOptions = IndicesOptions.readIndicesOptions(in);
     }
 
     @Override
@@ -40,4 +58,30 @@ public class GetMappingsRequest extends ClusterInfoRequest<GetMappingsRequest> {
     public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
         return new CancellableTask(id, type, action, "", parentTaskId, headers);
     }
+
+    @Override
+    public GetMappingsRequest indices(String... indices) {
+        this.indices = indices;
+        return this;
+    }
+
+    public GetMappingsRequest indicesOptions(IndicesOptions indicesOptions) {
+        this.indicesOptions = indicesOptions;
+        return this;
+    }
+
+    @Override
+    public String[] indices() {
+        return indices;
+    }
+
+    @Override
+    public IndicesOptions indicesOptions() {
+        return indicesOptions;
+    }
+
+    @Override
+    public boolean includeDataStreams() {
+        return true;
+    }
 }

+ 19 - 5
server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsRequestBuilder.java

@@ -9,16 +9,30 @@
 
 package org.elasticsearch.action.admin.indices.mapping.get;
 
-import org.elasticsearch.action.support.master.info.ClusterInfoRequestBuilder;
+import org.elasticsearch.action.ActionRequestBuilder;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.internal.ElasticsearchClient;
+import org.elasticsearch.common.util.ArrayUtils;
 import org.elasticsearch.core.TimeValue;
 
-public class GetMappingsRequestBuilder extends ClusterInfoRequestBuilder<
-    GetMappingsRequest,
-    GetMappingsResponse,
-    GetMappingsRequestBuilder> {
+public class GetMappingsRequestBuilder extends ActionRequestBuilder<GetMappingsRequest, GetMappingsResponse> {
 
     public GetMappingsRequestBuilder(ElasticsearchClient client, TimeValue masterTimeout, String... indices) {
         super(client, GetMappingsAction.INSTANCE, new GetMappingsRequest(masterTimeout).indices(indices));
     }
+
+    public GetMappingsRequestBuilder setIndices(String... indices) {
+        request.indices(indices);
+        return this;
+    }
+
+    public GetMappingsRequestBuilder addIndices(String... indices) {
+        request.indices(ArrayUtils.concat(request.indices(), indices));
+        return this;
+    }
+
+    public GetMappingsRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
+        request.indicesOptions(indicesOptions);
+        return this;
+    }
 }

+ 6 - 18
server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponse.java

@@ -9,15 +9,13 @@
 
 package org.elasticsearch.action.admin.indices.mapping.get;
 
-import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Iterators;
-import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
-import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.core.UpdateForV10;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContent;
 
@@ -35,21 +33,6 @@ public class GetMappingsResponse extends ActionResponse implements ChunkedToXCon
         this.mappings = mappings;
     }
 
-    GetMappingsResponse(StreamInput in) throws IOException {
-        super(in);
-        mappings = in.readImmutableMap(in.getTransportVersion().before(TransportVersions.V_8_0_0) ? i -> {
-            int mappingCount = i.readVInt();
-            assert mappingCount == 1 || mappingCount == 0 : "Expected 0 or 1 mappings but got " + mappingCount;
-            if (mappingCount == 1) {
-                String type = i.readString();
-                assert MapperService.SINGLE_MAPPING_NAME.equals(type) : "Expected type [_doc] but got [" + type + "]";
-                return new MappingMetadata(i);
-            } else {
-                return MappingMetadata.EMPTY_MAPPINGS;
-            }
-        } : i -> i.readBoolean() ? new MappingMetadata(i) : MappingMetadata.EMPTY_MAPPINGS);
-    }
-
     public Map<String, MappingMetadata> mappings() {
         return mappings;
     }
@@ -58,6 +41,11 @@ public class GetMappingsResponse extends ActionResponse implements ChunkedToXCon
         return mappings();
     }
 
+    /**
+     * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
+     * we no longer need to support calling this action remotely.
+     */
+    @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         MappingMetadata.writeMappingMetadata(out, mappings);

+ 43 - 14
server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetMappingsAction.java

@@ -13,12 +13,16 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.master.info.TransportClusterInfoAction;
-import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.action.support.ChannelActionListener;
+import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
+import org.elasticsearch.cluster.ProjectState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.core.UpdateForV10;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.injection.guice.Inject;
 import org.elasticsearch.tasks.CancellableTask;
@@ -28,12 +32,19 @@ import org.elasticsearch.transport.TransportService;
 
 import java.util.Map;
 
-public class TransportGetMappingsAction extends TransportClusterInfoAction<GetMappingsRequest, GetMappingsResponse> {
+public class TransportGetMappingsAction extends TransportLocalProjectMetadataAction<GetMappingsRequest, GetMappingsResponse> {
 
     private static final Logger logger = LogManager.getLogger(TransportGetMappingsAction.class);
 
     private final IndicesService indicesService;
+    private final IndexNameExpressionResolver indexNameExpressionResolver;
 
+    /**
+     * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
+     * we no longer need to support calling this action remotely.
+     */
+    @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
+    @SuppressWarnings("this-escape")
     @Inject
     public TransportGetMappingsAction(
         TransportService transportService,
@@ -46,28 +57,46 @@ public class TransportGetMappingsAction extends TransportClusterInfoAction<GetMa
     ) {
         super(
             GetMappingsAction.NAME,
-            transportService,
-            clusterService,
-            threadPool,
             actionFilters,
-            GetMappingsRequest::new,
-            indexNameExpressionResolver,
-            GetMappingsResponse::new,
+            transportService.getTaskManager(),
+            clusterService,
+            threadPool.executor(ThreadPool.Names.MANAGEMENT),
             projectResolver
         );
         this.indicesService = indicesService;
+        this.indexNameExpressionResolver = indexNameExpressionResolver;
+
+        transportService.registerRequestHandler(
+            actionName,
+            executor,
+            false,
+            true,
+            GetMappingsRequest::new,
+            (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
+        );
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(GetMappingsRequest request, ProjectState state) {
+        return state.blocks()
+            .indicesBlockedException(
+                state.projectId(),
+                ClusterBlockLevel.METADATA_READ,
+                indexNameExpressionResolver.concreteIndexNames(state.metadata(), request)
+            );
     }
 
     @Override
-    protected void doMasterOperation(
+    protected void localClusterStateOperation(
         Task task,
         final GetMappingsRequest request,
-        String[] concreteIndices,
-        final ClusterState state,
+        final ProjectState state,
         final ActionListener<GetMappingsResponse> listener
     ) {
-        logger.trace("serving getMapping request based on version {}", state.version());
-        final Map<String, MappingMetadata> mappings = projectResolver.getProjectMetadata(state)
+        ((CancellableTask) task).ensureNotCancelled();
+        logger.trace("serving getMapping request based on version {}", state.cluster().version());
+        String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state.metadata(), request);
+        final Map<String, MappingMetadata> mappings = state.metadata()
             .findMappings(concreteIndices, indicesService.getFieldFilter(), () -> checkCancellation(task));
         listener.onResponse(new GetMappingsResponse(mappings));
     }

+ 3 - 3
server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetMappingAction.java

@@ -16,6 +16,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.http.HttpChannel;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestCancellableNodeClient;
@@ -25,7 +26,6 @@ import java.io.IOException;
 import java.util.List;
 
 import static org.elasticsearch.rest.RestRequest.Method.GET;
-import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;
 
 @ServerlessScope(Scope.PUBLIC)
 public class RestGetMappingAction extends BaseRestHandler {
@@ -50,10 +50,10 @@ public class RestGetMappingAction extends BaseRestHandler {
     @Override
     public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
         final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
-        final GetMappingsRequest getMappingsRequest = new GetMappingsRequest(getMasterNodeTimeout(request));
+        final GetMappingsRequest getMappingsRequest = new GetMappingsRequest(RestUtils.getMasterNodeTimeout(request));
         getMappingsRequest.indices(indices);
         getMappingsRequest.indicesOptions(IndicesOptions.fromRequest(request, getMappingsRequest.indicesOptions()));
-        getMappingsRequest.local(request.paramAsBoolean("local", getMappingsRequest.local()));
+        RestUtils.consumeDeprecatedLocalParameter(request);
         final HttpChannel httpChannel = request.getHttpChannel();
         return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
             .indices()

+ 2 - 14
server/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponseTests.java

@@ -10,10 +10,9 @@
 package org.elasticsearch.action.admin.indices.mapping.get;
 
 import org.elasticsearch.cluster.metadata.MappingMetadata;
-import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.EqualsHashCodeTestUtils;
 
 import java.util.HashMap;
@@ -23,18 +22,13 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-public class GetMappingsResponseTests extends AbstractWireSerializingTestCase<GetMappingsResponse> {
+public class GetMappingsResponseTests extends ESTestCase {
 
     public void testCheckEqualsAndHashCode() {
         GetMappingsResponse resp = createTestInstance();
         EqualsHashCodeTestUtils.checkEqualsAndHashCode(resp, r -> new GetMappingsResponse(r.mappings()), GetMappingsResponseTests::mutate);
     }
 
-    @Override
-    protected Writeable.Reader<GetMappingsResponse> instanceReader() {
-        return GetMappingsResponse::new;
-    }
-
     private static GetMappingsResponse mutate(GetMappingsResponse original) {
         Map<String, MappingMetadata> builder = new HashMap<>(original.mappings());
         String indexKey = original.mappings().keySet().iterator().next();
@@ -42,11 +36,6 @@ public class GetMappingsResponseTests extends AbstractWireSerializingTestCase<Ge
         return new GetMappingsResponse(builder);
     }
 
-    @Override
-    protected GetMappingsResponse mutateInstance(GetMappingsResponse instance) {
-        return mutate(instance);
-    }
-
     public static MappingMetadata createMappingsForIndex() {
         Map<String, Object> mappings = new HashMap<>();
         if (rarely() == false) { // rarely have no fields
@@ -60,7 +49,6 @@ public class GetMappingsResponseTests extends AbstractWireSerializingTestCase<Ge
         return new MappingMetadata(MapperService.SINGLE_MAPPING_NAME, mappings);
     }
 
-    @Override
     protected GetMappingsResponse createTestInstance() {
         GetMappingsResponse resp = new GetMappingsResponse(Map.of("index-" + randomAlphaOfLength(5), createMappingsForIndex()));
         logger.debug("--> created: {}", resp);