Browse Source

Combine put-mapping task and listener (#82646)

Today put-mapping tasks executed by the master have a separate
ClusterStateTaskListener to feed back the result to the requestor. It'd
be preferable to use the task itself as the listener. This commit does
that.

Relates #82644
David Turner 3 năm trước cách đây
mục cha
commit
de9e34763b

+ 47 - 35
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java

@@ -56,16 +56,53 @@ public class MetadataMappingService {
         this.indicesService = indicesService;
     }
 
-    class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> {
+    static class PutMappingClusterStateUpdateTask implements AckedClusterStateTaskListener {
+
+        private final PutMappingClusterStateUpdateRequest request;
+        private final ActionListener<AcknowledgedResponse> listener;
+
+        PutMappingClusterStateUpdateTask(PutMappingClusterStateUpdateRequest request, ActionListener<AcknowledgedResponse> listener) {
+            this.request = request;
+            this.listener = listener;
+        }
+
+        @Override
+        public void onFailure(Exception e) {
+            listener.onFailure(e);
+        }
+
+        @Override
+        public boolean mustAck(DiscoveryNode discoveryNode) {
+            return true;
+        }
+
+        @Override
+        public void onAllNodesAcked(@Nullable Exception e) {
+            listener.onResponse(AcknowledgedResponse.of(e == null));
+        }
+
+        @Override
+        public void onAckTimeout() {
+            listener.onResponse(AcknowledgedResponse.FALSE);
+        }
+
         @Override
-        public ClusterTasksResult<PutMappingClusterStateUpdateRequest> execute(
+        public TimeValue ackTimeout() {
+            return request.ackTimeout();
+        }
+    }
+
+    class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterStateUpdateTask> {
+        @Override
+        public ClusterTasksResult<PutMappingClusterStateUpdateTask> execute(
             ClusterState currentState,
-            List<PutMappingClusterStateUpdateRequest> tasks
+            List<PutMappingClusterStateUpdateTask> tasks
         ) throws Exception {
             Map<Index, MapperService> indexMapperServices = new HashMap<>();
-            ClusterTasksResult.Builder<PutMappingClusterStateUpdateRequest> builder = ClusterTasksResult.builder();
+            ClusterTasksResult.Builder<PutMappingClusterStateUpdateTask> builder = ClusterTasksResult.builder();
             try {
-                for (PutMappingClusterStateUpdateRequest request : tasks) {
+                for (PutMappingClusterStateUpdateTask task : tasks) {
+                    final PutMappingClusterStateUpdateRequest request = task.request;
                     try {
                         for (Index index : request.indices()) {
                             final IndexMetadata indexMetadata = currentState.metadata().getIndexSafe(index);
@@ -77,9 +114,9 @@ public class MetadataMappingService {
                             }
                         }
                         currentState = applyRequest(currentState, request, indexMapperServices);
-                        builder.success(request);
+                        builder.success(task);
                     } catch (Exception e) {
-                        builder.failure(request, e);
+                        builder.failure(task, e);
                     }
                 }
                 return builder.build(currentState);
@@ -211,38 +248,13 @@ public class MetadataMappingService {
             return;
         }
 
+        final PutMappingClusterStateUpdateTask task = new PutMappingClusterStateUpdateTask(request, listener);
         clusterService.submitStateUpdateTask(
             "put-mapping " + Strings.arrayToCommaDelimitedString(request.indices()),
-            request,
+            task,
             ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
             putMappingExecutor,
-            new AckedClusterStateTaskListener() {
-
-                @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-
-                @Override
-                public boolean mustAck(DiscoveryNode discoveryNode) {
-                    return true;
-                }
-
-                @Override
-                public void onAllNodesAcked(@Nullable Exception e) {
-                    listener.onResponse(AcknowledgedResponse.of(e == null));
-                }
-
-                @Override
-                public void onAckTimeout() {
-                    listener.onResponse(AcknowledgedResponse.FALSE);
-                }
-
-                @Override
-                public TimeValue ackTimeout() {
-                    return request.ackTimeout();
-                }
-            }
+            task
         );
     }
 }

+ 19 - 8
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMappingServiceTests.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.cluster.metadata;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -20,6 +21,7 @@ import org.elasticsearch.test.InternalSettingsPlugin;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
@@ -41,8 +43,8 @@ public class MetadataMappingServiceTests extends ESSingleNodeTestCase {
         final PutMappingClusterStateUpdateRequest request = new PutMappingClusterStateUpdateRequest("""
             { "properties": { "field": { "type": "text" }}}""");
         request.indices(new Index[] { indexService.index() });
-        final ClusterStateTaskExecutor.ClusterTasksResult<PutMappingClusterStateUpdateRequest> result = mappingService.putMappingExecutor
-            .execute(clusterService.state(), Collections.singletonList(request));
+        final ClusterStateTaskExecutor.ClusterTasksResult<MetadataMappingService.PutMappingClusterStateUpdateTask> result =
+            mappingService.putMappingExecutor.execute(clusterService.state(), singleTask(request));
         // the task completed successfully
         assertThat(result.executionResults.size(), equalTo(1));
         assertTrue(result.executionResults.values().iterator().next().isSuccess());
@@ -64,13 +66,13 @@ public class MetadataMappingServiceTests extends ESSingleNodeTestCase {
             { "properties": { "field": { "type": "text" }}}""").indices(new Index[] { indexService.index() });
         ClusterStateTaskExecutor.ClusterTasksResult<?> result = mappingService.putMappingExecutor.execute(
             clusterService.state(),
-            Collections.singletonList(request)
+            singleTask(request)
         );
         assertTrue(result.executionResults.values().stream().noneMatch(res -> res.isSuccess() == false));
 
         ClusterStateTaskExecutor.ClusterTasksResult<?> result2 = mappingService.putMappingExecutor.execute(
             result.resultingState,
-            Collections.singletonList(request)
+            singleTask(request)
         );
         assertTrue(result.executionResults.values().stream().noneMatch(res -> res.isSuccess() == false));
 
@@ -85,8 +87,8 @@ public class MetadataMappingServiceTests extends ESSingleNodeTestCase {
         final PutMappingClusterStateUpdateRequest request = new PutMappingClusterStateUpdateRequest("""
             { "properties": { "field": { "type": "text" }}}""");
         request.indices(new Index[] { indexService.index() });
-        final ClusterStateTaskExecutor.ClusterTasksResult<PutMappingClusterStateUpdateRequest> result = mappingService.putMappingExecutor
-            .execute(clusterService.state(), Collections.singletonList(request));
+        final ClusterStateTaskExecutor.ClusterTasksResult<MetadataMappingService.PutMappingClusterStateUpdateTask> result =
+            mappingService.putMappingExecutor.execute(clusterService.state(), singleTask(request));
         assertThat(result.executionResults.size(), equalTo(1));
         assertTrue(result.executionResults.values().iterator().next().isSuccess());
         assertThat(result.resultingState.metadata().index("test").getMappingVersion(), equalTo(1 + previousVersion));
@@ -99,11 +101,20 @@ public class MetadataMappingServiceTests extends ESSingleNodeTestCase {
         final ClusterService clusterService = getInstanceFromNode(ClusterService.class);
         final PutMappingClusterStateUpdateRequest request = new PutMappingClusterStateUpdateRequest("{ \"properties\": {}}");
         request.indices(new Index[] { indexService.index() });
-        final ClusterStateTaskExecutor.ClusterTasksResult<PutMappingClusterStateUpdateRequest> result = mappingService.putMappingExecutor
-            .execute(clusterService.state(), Collections.singletonList(request));
+        final ClusterStateTaskExecutor.ClusterTasksResult<MetadataMappingService.PutMappingClusterStateUpdateTask> result =
+            mappingService.putMappingExecutor.execute(clusterService.state(), singleTask(request));
         assertThat(result.executionResults.size(), equalTo(1));
         assertTrue(result.executionResults.values().iterator().next().isSuccess());
         assertThat(result.resultingState.metadata().index("test").getMappingVersion(), equalTo(previousVersion));
     }
 
+    private static List<MetadataMappingService.PutMappingClusterStateUpdateTask> singleTask(PutMappingClusterStateUpdateRequest request) {
+        return Collections.singletonList(
+            new MetadataMappingService.PutMappingClusterStateUpdateTask(
+                request,
+                ActionListener.wrap(() -> { throw new AssertionError("task should not complete publication"); })
+            )
+        );
+    }
+
 }