Browse Source

Add a batched executor for tasks with results (#90459)

This change extends the `SimpleBatchedExecutor` to allow batchable cluster state 
update tasks to return a result that will be provided to the `taskSucceeded` call. 

Co-authored-by: David Turner <david.turner@elastic.co>
Pooya Salehi 3 years ago
parent
commit
bbcdfe2d0a

+ 5 - 4
server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportDeleteDesiredNodesAction.java

@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -80,14 +81,14 @@ public class TransportDeleteDesiredNodesAction extends TransportMasterNodeAction
         }
     }
 
-    private static class DeleteDesiredNodesExecutor extends SimpleBatchedExecutor<DeleteDesiredNodesTask> {
+    private static class DeleteDesiredNodesExecutor extends SimpleBatchedExecutor<DeleteDesiredNodesTask, Void> {
         @Override
-        public ClusterState executeTask(DeleteDesiredNodesTask task, ClusterState clusterState) {
-            return clusterState;
+        public Tuple<ClusterState, Void> executeTask(DeleteDesiredNodesTask task, ClusterState clusterState) {
+            return Tuple.tuple(clusterState, null);
         }
 
         @Override
-        public void taskSucceeded(DeleteDesiredNodesTask task) {
+        public void taskSucceeded(DeleteDesiredNodesTask task, Void unused) {
             task.listener().onResponse(ActionResponse.Empty.INSTANCE);
         }
 

+ 18 - 11
server/src/main/java/org/elasticsearch/cluster/SimpleBatchedExecutor.java

@@ -8,22 +8,26 @@
 
 package org.elasticsearch.cluster;
 
+import org.elasticsearch.core.Tuple;
+
 /**
- * A basic implementation for batch executors that simply need to execute the tasks in the batch iteratively,
- * producing a cluster state after each task. This allows executing the tasks in the batch as a
- * series of executions, each taking an input cluster state and producing a new cluster state that serves as the
- * input of the next task in the batch.
+ * A basic implementation for batch executors that simply need to execute the tasks in the batch iteratively.
+ * Executing each task in the batch produces a cluster state and a result of type {@code TaskResult}.
+ * This allows executing the tasks in the batch as a series of executions, each taking an input cluster state
+ * and producing a new cluster state that serves as the input of the next task in the batch.
  */
-public abstract class SimpleBatchedExecutor<T extends ClusterStateTaskListener> implements ClusterStateTaskExecutor<T> {
+public abstract class SimpleBatchedExecutor<Task extends ClusterStateTaskListener, TaskResult> implements ClusterStateTaskExecutor<Task> {
 
     /**
      * Executes the provided task from the batch.
      *
      * @param task The task to be executed.
      * @param clusterState    The cluster state on which the task should be executed.
-     * @return The resulting cluster state after executing this task.
+     * @return A tuple consisting of the resulting cluster state after executing this task, and the result of the task execution.
+     * The returned cluster state serves as the cluster state on which the next task in the batch will run. The returned
+     * task result is provided to the {@link SimpleBatchedExecutor#taskSucceeded} implementation.
      */
-    public abstract ClusterState executeTask(T task, ClusterState clusterState) throws Exception;
+    public abstract Tuple<ClusterState, TaskResult> executeTask(Task task, ClusterState clusterState) throws Exception;
 
     /**
      * Called once all tasks in the batch have finished execution. It should return a cluster state that reflects
@@ -42,8 +46,9 @@ public abstract class SimpleBatchedExecutor<T extends ClusterStateTaskListener>
      * task in the batch.
      *
      * @param task The task that successfully finished execution.
+     * @param taskResult The result returned from the successful execution of the task.
      */
-    public abstract void taskSucceeded(T task);
+    public abstract void taskSucceeded(Task task, TaskResult taskResult);
 
     @Override
     public final void clusterStatePublished(ClusterState newClusterState) {
@@ -56,14 +61,16 @@ public abstract class SimpleBatchedExecutor<T extends ClusterStateTaskListener>
     public void clusterStatePublished() {}
 
     @Override
-    public final ClusterState execute(BatchExecutionContext<T> batchExecutionContext) throws Exception {
+    public final ClusterState execute(BatchExecutionContext<Task> batchExecutionContext) throws Exception {
         var initState = batchExecutionContext.initialState();
         var clusterState = initState;
         for (final var taskContext : batchExecutionContext.taskContexts()) {
             try (var ignored = taskContext.captureResponseHeaders()) {
                 var task = taskContext.getTask();
-                clusterState = executeTask(task, clusterState);
-                taskContext.success(() -> taskSucceeded(task));
+                Tuple<ClusterState, TaskResult> result = executeTask(task, clusterState);
+                clusterState = result.v1();
+                final var taskResult = result.v2();
+                taskContext.success(() -> taskSucceeded(task, taskResult));
             } catch (Exception e) {
                 taskContext.onFailure(e);
             }

+ 70 - 100
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java

@@ -36,6 +36,7 @@ import org.elasticsearch.cluster.ClusterStateAckListener;
 import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
+import org.elasticsearch.cluster.SimpleBatchedExecutor;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -169,42 +170,37 @@ public class MetadataIndexStateService {
         );
     }
 
-    private class AddBlocksToCloseExecutor implements ClusterStateTaskExecutor<AddBlocksToCloseTask> {
+    private class AddBlocksToCloseExecutor extends SimpleBatchedExecutor<AddBlocksToCloseTask, Map<Index, ClusterBlock>> {
 
         @Override
-        public ClusterState execute(BatchExecutionContext<AddBlocksToCloseTask> batchExecutionContext) throws Exception {
-            ClusterState state = batchExecutionContext.initialState();
-            for (final var taskContext : batchExecutionContext.taskContexts()) {
-                final var task = taskContext.getTask();
-                try {
-                    final Map<Index, ClusterBlock> blockedIndices = new HashMap<>(task.request.indices().length);
-                    state = addIndexClosedBlocks(task.request.indices(), blockedIndices, state);
-                    taskContext.success(() -> {
-                        if (blockedIndices.isEmpty()) {
-                            task.listener().onResponse(CloseIndexResponse.EMPTY);
-                        } else {
-                            threadPool.executor(ThreadPool.Names.MANAGEMENT)
-                                .execute(
-                                    new WaitForClosedBlocksApplied(
-                                        blockedIndices,
-                                        task.request,
-                                        task.listener().delegateFailure((delegate2, verifyResults) -> {
-                                            clusterService.submitStateUpdateTask(
-                                                "close-indices",
-                                                new CloseIndicesTask(task.request, blockedIndices, verifyResults, delegate2),
-                                                ClusterStateTaskConfig.build(Priority.URGENT),
-                                                closesExecutor
-                                            );
-                                        })
-                                    )
+        public Tuple<ClusterState, Map<Index, ClusterBlock>> executeTask(AddBlocksToCloseTask task, ClusterState clusterState)
+            throws Exception {
+            final Map<Index, ClusterBlock> blockedIndices = new HashMap<>(task.request.indices().length);
+            var updatedClusterState = addIndexClosedBlocks(task.request.indices(), blockedIndices, clusterState);
+            return Tuple.tuple(updatedClusterState, blockedIndices);
+        }
+
+        @Override
+        public void taskSucceeded(AddBlocksToCloseTask task, Map<Index, ClusterBlock> blockedIndices) {
+            if (blockedIndices.isEmpty()) {
+                task.listener().onResponse(CloseIndexResponse.EMPTY);
+            } else {
+                threadPool.executor(ThreadPool.Names.MANAGEMENT)
+                    .execute(
+                        new WaitForClosedBlocksApplied(
+                            blockedIndices,
+                            task.request,
+                            task.listener().delegateFailure((delegate2, verifyResults) -> {
+                                clusterService.submitStateUpdateTask(
+                                    "close-indices",
+                                    new CloseIndicesTask(task.request, blockedIndices, verifyResults, delegate2),
+                                    ClusterStateTaskConfig.build(Priority.URGENT),
+                                    closesExecutor
                                 );
-                        }
-                    });
-                } catch (Exception e) {
-                    taskContext.onFailure(e);
-                }
+                            })
+                        )
+                    );
             }
-            return state;
         }
     }
 
@@ -477,53 +473,38 @@ public class MetadataIndexStateService {
         );
     }
 
-    private class AddBlocksExecutor implements ClusterStateTaskExecutor<AddBlocksTask> {
+    private class AddBlocksExecutor extends SimpleBatchedExecutor<AddBlocksTask, Map<Index, ClusterBlock>> {
 
         @Override
-        public ClusterState execute(BatchExecutionContext<AddBlocksTask> batchExecutionContext) throws Exception {
-            ClusterState state = batchExecutionContext.initialState();
+        public Tuple<ClusterState, Map<Index, ClusterBlock>> executeTask(AddBlocksTask task, ClusterState clusterState) {
+            return addIndexBlock(task.request.indices(), clusterState, task.request.getBlock());
+        }
 
-            for (final var taskContext : batchExecutionContext.taskContexts()) {
-                try {
-                    final var task = taskContext.getTask();
-                    final Tuple<ClusterState, Map<Index, ClusterBlock>> blockResult = addIndexBlock(
-                        task.request.indices(),
-                        state,
-                        task.request.getBlock()
-                    );
-                    state = blockResult.v1();
-                    final Map<Index, ClusterBlock> blockedIndices = blockResult.v2();
-                    taskContext.success(() -> {
-                        if (blockedIndices.isEmpty()) {
-                            task.listener().onResponse(AddIndexBlockResponse.EMPTY);
-                        } else {
-                            threadPool.executor(ThreadPool.Names.MANAGEMENT)
-                                .execute(
-                                    new WaitForBlocksApplied(
-                                        blockedIndices,
-                                        task.request,
-                                        task.listener().delegateFailure((delegate2, verifyResults) -> {
-                                            clusterService.submitStateUpdateTask(
-                                                "finalize-index-block-["
-                                                    + task.request.getBlock().name
-                                                    + "]-["
-                                                    + blockedIndices.keySet().stream().map(Index::getName).collect(Collectors.joining(", "))
-                                                    + "]",
-                                                new FinalizeBlocksTask(task.request, blockedIndices, verifyResults, delegate2),
-                                                ClusterStateTaskConfig.build(Priority.URGENT),
-                                                finalizeBlocksExecutor
-                                            );
-                                        })
-                                    )
+        @Override
+        public void taskSucceeded(AddBlocksTask task, Map<Index, ClusterBlock> blockedIndices) {
+            if (blockedIndices.isEmpty()) {
+                task.listener().onResponse(AddIndexBlockResponse.EMPTY);
+            } else {
+                threadPool.executor(ThreadPool.Names.MANAGEMENT)
+                    .execute(
+                        new WaitForBlocksApplied(
+                            blockedIndices,
+                            task.request,
+                            task.listener().delegateFailure((delegate2, verifyResults) -> {
+                                clusterService.submitStateUpdateTask(
+                                    "finalize-index-block-["
+                                        + task.request.getBlock().name
+                                        + "]-["
+                                        + blockedIndices.keySet().stream().map(Index::getName).collect(Collectors.joining(", "))
+                                        + "]",
+                                    new FinalizeBlocksTask(task.request, blockedIndices, verifyResults, delegate2),
+                                    ClusterStateTaskConfig.build(Priority.URGENT),
+                                    finalizeBlocksExecutor
                                 );
-                        }
-                    });
-                } catch (Exception e) {
-                    taskContext.onFailure(e);
-                }
+                            })
+                        )
+                    );
             }
-
-            return state;
         }
     }
 
@@ -537,35 +518,24 @@ public class MetadataIndexStateService {
         }
     }
 
-    private static class FinalizeBlocksExecutor implements ClusterStateTaskExecutor<FinalizeBlocksTask> {
+    private static class FinalizeBlocksExecutor extends SimpleBatchedExecutor<FinalizeBlocksTask, List<AddBlockResult>> {
 
         @Override
-        public ClusterState execute(BatchExecutionContext<FinalizeBlocksTask> batchExecutionContext) throws Exception {
-            ClusterState state = batchExecutionContext.initialState();
-
-            for (final var taskContext : batchExecutionContext.taskContexts()) {
-                try {
-                    final var task = taskContext.getTask();
-                    final Tuple<ClusterState, List<AddBlockResult>> finalizeResult = finalizeBlock(
-                        state,
-                        task.blockedIndices,
-                        task.verifyResults,
-                        task.request.getBlock()
-                    );
-                    state = finalizeResult.v1();
-                    final List<AddBlockResult> indices = finalizeResult.v2();
-                    assert indices.size() == task.verifyResults.size();
-
-                    taskContext.success(() -> {
-                        final boolean acknowledged = indices.stream().noneMatch(AddBlockResult::hasFailures);
-                        task.listener().onResponse(new AddIndexBlockResponse(acknowledged, acknowledged, indices));
-                    });
-                } catch (Exception e) {
-                    taskContext.onFailure(e);
-                }
-            }
+        public Tuple<ClusterState, List<AddBlockResult>> executeTask(FinalizeBlocksTask task, ClusterState clusterState) throws Exception {
+            final Tuple<ClusterState, List<AddBlockResult>> finalizeResult = finalizeBlock(
+                clusterState,
+                task.blockedIndices,
+                task.verifyResults,
+                task.request.getBlock()
+            );
+            assert finalizeResult.v2().size() == task.verifyResults.size();
+            return finalizeResult;
+        }
 
-            return state;
+        @Override
+        public void taskSucceeded(FinalizeBlocksTask task, List<AddBlockResult> indices) {
+            final boolean acknowledged = indices.stream().noneMatch(AddBlockResult::hasFailures);
+            task.listener().onResponse(new AddIndexBlockResponse(acknowledged, acknowledged, indices));
         }
     }
 

+ 12 - 11
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

@@ -20,7 +20,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskConfig;
-import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.SimpleBatchedExecutor;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -37,6 +36,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexService;
@@ -126,17 +126,18 @@ public class MetadataIndexTemplateService {
     /**
      * This is the cluster state task executor for all template-based actions.
      */
-    private static final ClusterStateTaskExecutor<TemplateClusterStateUpdateTask> TEMPLATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() {
-        @Override
-        public ClusterState executeTask(TemplateClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
-            return task.execute(clusterState);
-        }
+    private static final SimpleBatchedExecutor<TemplateClusterStateUpdateTask, Void> TEMPLATE_TASK_EXECUTOR =
+        new SimpleBatchedExecutor<>() {
+            @Override
+            public Tuple<ClusterState, Void> executeTask(TemplateClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
+                return Tuple.tuple(task.execute(clusterState), null);
+            }
 
-        @Override
-        public void taskSucceeded(TemplateClusterStateUpdateTask task) {
-            task.listener.onResponse(AcknowledgedResponse.TRUE);
-        }
-    };
+            @Override
+            public void taskSucceeded(TemplateClusterStateUpdateTask task, Void unused) {
+                task.listener.onResponse(AcknowledgedResponse.TRUE);
+            }
+        };
 
     /**
      * A specialized cluster state update task that always takes a listener handling an

+ 5 - 4
server/src/main/java/org/elasticsearch/health/metadata/HealthMetadataService.java

@@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.Tuple;
 
 import java.util.List;
 
@@ -176,15 +177,15 @@ public class HealthMetadataService {
 
         abstract ClusterState execute(ClusterState currentState);
 
-        static class Executor extends SimpleBatchedExecutor<UpsertHealthMetadataTask> {
+        static class Executor extends SimpleBatchedExecutor<UpsertHealthMetadataTask, Void> {
 
             @Override
-            public ClusterState executeTask(UpsertHealthMetadataTask task, ClusterState clusterState) {
-                return task.execute(clusterState);
+            public Tuple<ClusterState, Void> executeTask(UpsertHealthMetadataTask task, ClusterState clusterState) {
+                return Tuple.tuple(task.execute(clusterState), null);
             }
 
             @Override
-            public void taskSucceeded(UpsertHealthMetadataTask task) {}
+            public void taskSucceeded(UpsertHealthMetadataTask task, Void unused) {}
         }
     }
 

+ 6 - 5
server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java

@@ -13,26 +13,27 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.SimpleBatchedExecutor;
+import org.elasticsearch.core.Tuple;
 
 /**
  * Reserved cluster error state task executor
  * <p>
  * We use this task executor to record any errors while updating and reserving the cluster state
  */
-class ReservedStateErrorTaskExecutor extends SimpleBatchedExecutor<ReservedStateErrorTask> {
+class ReservedStateErrorTaskExecutor extends SimpleBatchedExecutor<ReservedStateErrorTask, Void> {
 
     private static final Logger logger = LogManager.getLogger(ReservedStateErrorTaskExecutor.class);
 
     @Override
-    public ClusterState executeTask(ReservedStateErrorTask task, ClusterState clusterState) {
+    public Tuple<ClusterState, Void> executeTask(ReservedStateErrorTask task, ClusterState clusterState) {
         if (task.shouldUpdate(clusterState)) {
-            return task.execute(clusterState);
+            return Tuple.tuple(task.execute(clusterState), null);
         }
-        return clusterState;
+        return Tuple.tuple(clusterState, null);
     }
 
     @Override
-    public void taskSucceeded(ReservedStateErrorTask task) {
+    public void taskSucceeded(ReservedStateErrorTask task, Void unused) {
         task.listener().onResponse(ActionResponse.Empty.INSTANCE);
     }
 

+ 5 - 4
server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java

@@ -16,11 +16,12 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.SimpleBatchedExecutor;
 import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.common.Priority;
+import org.elasticsearch.core.Tuple;
 
 /**
  * Reserved cluster state update task executor
  */
-public class ReservedStateUpdateTaskExecutor extends SimpleBatchedExecutor<ReservedStateUpdateTask> {
+public class ReservedStateUpdateTaskExecutor extends SimpleBatchedExecutor<ReservedStateUpdateTask, Void> {
 
     private static final Logger logger = LogManager.getLogger(ReservedStateUpdateTaskExecutor.class);
 
@@ -32,12 +33,12 @@ public class ReservedStateUpdateTaskExecutor extends SimpleBatchedExecutor<Reser
     }
 
     @Override
-    public ClusterState executeTask(ReservedStateUpdateTask task, ClusterState clusterState) {
-        return task.execute(clusterState);
+    public Tuple<ClusterState, Void> executeTask(ReservedStateUpdateTask task, ClusterState clusterState) {
+        return Tuple.tuple(task.execute(clusterState), null);
     }
 
     @Override
-    public void taskSucceeded(ReservedStateUpdateTask task) {
+    public void taskSucceeded(ReservedStateUpdateTask task, Void unused) {
         task.listener().onResponse(ActionResponse.Empty.INSTANCE);
     }
 

+ 12 - 11
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupAction.java

@@ -27,7 +27,6 @@ import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.client.internal.OriginSettingClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskConfig;
-import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.SimpleBatchedExecutor;
 import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -45,6 +44,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -94,17 +94,18 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
     /**
      * This is the cluster state task executor for cluster state update actions.
      */
-    private static final ClusterStateTaskExecutor<RollupClusterStateUpdateTask> STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() {
-        @Override
-        public ClusterState executeTask(RollupClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
-            return task.execute(clusterState);
-        }
+    private static final SimpleBatchedExecutor<RollupClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR =
+        new SimpleBatchedExecutor<>() {
+            @Override
+            public Tuple<ClusterState, Void> executeTask(RollupClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
+                return Tuple.tuple(task.execute(clusterState), null);
+            }
 
-        @Override
-        public void taskSucceeded(RollupClusterStateUpdateTask task) {
-            task.listener.onResponse(AcknowledgedResponse.TRUE);
-        }
-    };
+            @Override
+            public void taskSucceeded(RollupClusterStateUpdateTask task, Void unused) {
+                task.listener.onResponse(AcknowledgedResponse.TRUE);
+            }
+        };
 
     @Inject
     public TransportRollupAction(