Browse Source

Add SimpleBatchedAckListenerTaskExecutor (#90521)

This change adds a `ClusterStateTaskExecutor` for batchable cluster
state update tasks that implement or are able to provide a
`ClusterStateAckListener`.

https://github.com/elastic/elasticsearch/issues/89192
Pooya Salehi 3 years ago
parent
commit
5516c516b2

+ 74 - 0
server/src/main/java/org/elasticsearch/cluster/SimpleBatchedAckListenerTaskExecutor.java

@@ -0,0 +1,74 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.core.Tuple;
+
+/**
+ * A basic batch executor implementation for tasks that can listen for acks themselves by providing a {@link ClusterStateAckListener}.
+ * The tasks in the batch are executed iteratively, producing a cluster state after each task. This allows executing the tasks
+ * in the batch as a series of executions, each taking an input cluster state and producing a new cluster state that serves as the
+ * input of the next task in the batch.
+ */
+public abstract class SimpleBatchedAckListenerTaskExecutor<Task extends ClusterStateTaskListener>
+    implements
+        ClusterStateTaskExecutor<Task> {
+
+    /**
+     * Executes the provided task from the batch.
+     *
+     * @param task The task to be executed.
+     * @param clusterState    The cluster state on which the task should be executed.
+     * @return A tuple consisting of the resulting cluster state after executing this task, and a {@link ClusterStateAckListener}.
+     * The listener is completed if the publication succeeds and the nodes ack the state update. The returned cluster state
+     * serves as the cluster state on which the next task in the batch will run.
+     */
+    public abstract Tuple<ClusterState, ClusterStateAckListener> executeTask(Task task, ClusterState clusterState) throws Exception;
+
+    /**
+     * Called once all tasks in the batch have finished execution. It should return a cluster state that reflects
+     * the execution of all the tasks.
+     *
+     * @param clusterState The cluster state resulting from the execution of all the tasks.
+     * @param clusterStateChanged Whether {@code clusterState} is different from the cluster state before executing the tasks in the batch.
+     * @return The resulting cluster state after executing all the tasks.
+     */
+    public ClusterState afterBatchExecution(ClusterState clusterState, boolean clusterStateChanged) {
+        return clusterState;
+    }
+
+    @Override
+    public final void clusterStatePublished(ClusterState newClusterState) {
+        clusterStatePublished();
+    }
+
+    /**
+     * Called after the new cluster state is published. Note that this method is not invoked if the cluster state was not updated.
+     */
+    public void clusterStatePublished() {}
+
+    @Override
+    public final ClusterState execute(BatchExecutionContext<Task> batchExecutionContext) throws Exception {
+        var initState = batchExecutionContext.initialState();
+        var clusterState = initState;
+        for (final var taskContext : batchExecutionContext.taskContexts()) {
+            try (var ignored = taskContext.captureResponseHeaders()) {
+                var task = taskContext.getTask();
+                final var result = executeTask(task, clusterState);
+                clusterState = result.v1();
+                taskContext.success(result.v2());
+            } catch (Exception e) {
+                taskContext.onFailure(e);
+            }
+        }
+        try (var ignored = batchExecutionContext.dropHeadersContext()) {
+            return afterBatchExecution(clusterState, clusterState != initState);
+        }
+    }
+}

+ 17 - 15
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java

@@ -12,9 +12,11 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexClusterStateUpdateRequest;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateAckListener;
 import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.RestoreInProgress;
+import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
 import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
@@ -24,6 +26,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.snapshots.RestoreService;
 import org.elasticsearch.snapshots.SnapshotInProgressException;
@@ -52,22 +55,21 @@ public class MetadataDeleteIndexService {
     public MetadataDeleteIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
         this.settings = settings;
         this.clusterService = clusterService;
-        executor = batchExecutionContext -> {
-            ClusterState state = batchExecutionContext.initialState();
-            for (ClusterStateTaskExecutor.TaskContext<DeleteIndexClusterStateUpdateRequest> taskContext : batchExecutionContext
-                .taskContexts()) {
-                try (var ignored = taskContext.captureResponseHeaders()) {
-                    state = deleteIndices(state, Sets.newHashSet(taskContext.getTask().indices()));
-                    taskContext.success(taskContext.getTask());
-                } catch (Exception e) {
-                    taskContext.onFailure(e);
-                }
-            }
-            if (state == batchExecutionContext.initialState()) {
-                return state;
+        executor = new SimpleBatchedAckListenerTaskExecutor<>() {
+            @Override
+            public Tuple<ClusterState, ClusterStateAckListener> executeTask(
+                DeleteIndexClusterStateUpdateRequest task,
+                ClusterState clusterState
+            ) {
+                return Tuple.tuple(deleteIndices(clusterState, Sets.newHashSet(task.indices())), task);
             }
-            try (var ignored = batchExecutionContext.dropHeadersContext()) {
-                return allocationService.reroute(state, "deleted indices");
+
+            @Override
+            public ClusterState afterBatchExecution(ClusterState clusterState, boolean clusterStateChanged) {
+                if (clusterStateChanged) {
+                    return allocationService.reroute(clusterState, "deleted indices");
+                }
+                return clusterState;
             }
         };
     }

+ 13 - 17
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java

@@ -19,6 +19,7 @@ import org.elasticsearch.cluster.ClusterStateAckListener;
 import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
+import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -31,6 +32,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.indices.IndicesService;
@@ -70,26 +72,20 @@ public class MetadataUpdateSettingsService {
         this.indexScopedSettings = indexScopedSettings;
         this.indicesService = indicesService;
         this.shardLimitValidator = shardLimitValidator;
-        this.executor = batchExecutionContext -> {
-            ClusterState state = batchExecutionContext.initialState();
-            for (final var taskContext : batchExecutionContext.taskContexts()) {
-                try {
-                    final var task = taskContext.getTask();
-                    try (var ignored = taskContext.captureResponseHeaders()) {
-                        state = task.execute(state);
-                    }
-                    taskContext.success(task.getAckListener());
-                } catch (Exception e) {
-                    taskContext.onFailure(e);
-                }
+        this.executor = new SimpleBatchedAckListenerTaskExecutor<>() {
+            @Override
+            public Tuple<ClusterState, ClusterStateAckListener> executeTask(UpdateSettingsTask task, ClusterState clusterState) {
+                return Tuple.tuple(task.execute(clusterState), task.getAckListener());
             }
-            if (state != batchExecutionContext.initialState()) {
-                // reroute in case things change that require it (like number of replicas)
-                try (var ignored = batchExecutionContext.dropHeadersContext()) {
-                    state = allocationService.reroute(state, "settings update");
+
+            @Override
+            public ClusterState afterBatchExecution(ClusterState clusterState, boolean clusterStateChanged) {
+                if (clusterStateChanged) {
+                    // reroute in case things change that require it (like number of replicas)
+                    return allocationService.reroute(clusterState, "settings update");
                 }
+                return clusterState;
             }
-            return state;
         };
     }