Browse Source

Merge pull request #14899 from jasontedor/cluster-state-batch

Split cluster state update tasks into roles
Jason Tedor 10 năm trước cách đây
mục cha
commit
c4a2298194
30 tập tin đã thay đổi với 1061 bổ sung677 xóa
  1. 1 1
      core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java
  2. 1 1
      core/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java
  3. 4 2
      core/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
  4. 54 0
      core/src/main/java/org/elasticsearch/cluster/AckedClusterStateTaskListener.java
  5. 7 1
      core/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java
  6. 29 7
      core/src/main/java/org/elasticsearch/cluster/ClusterService.java
  7. 92 0
      core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskConfig.java
  8. 132 0
      core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java
  9. 43 0
      core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java
  10. 24 25
      core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java
  11. 8 2
      core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
  12. 6 6
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java
  13. 1 2
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java
  14. 1 1
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java
  15. 2 2
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java
  16. 3 2
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java
  17. 222 298
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java
  18. 5 14
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java
  19. 1 1
      core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
  20. 241 198
      core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
  21. 7 3
      core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java
  22. 8 8
      core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
  23. 134 23
      core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java
  24. 6 35
      core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java
  25. 3 13
      core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java
  26. 2 7
      core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
  27. 2 3
      test-framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java
  28. 20 20
      test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java
  29. 1 1
      test-framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java
  30. 1 1
      test-framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

@@ -74,7 +74,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
     protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener<ClusterHealthResponse> listener) {
         if (request.waitForEvents() != null) {
             final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis();
-            clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ClusterStateUpdateTask() {
+            clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", new ClusterStateUpdateTask(request.waitForEvents()) {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
                     return currentState;

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java

@@ -68,7 +68,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction<Clu
 
     @Override
     protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) {
-        clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.IMMEDIATE, new AckedClusterStateUpdateTask<ClusterRerouteResponse>(request, listener) {
+        clusterService.submitStateUpdateTask("cluster_reroute (api)", new AckedClusterStateUpdateTask<ClusterRerouteResponse>(Priority.IMMEDIATE, request, listener) {
 
             private volatile ClusterState clusterStateToSend;
             private volatile RoutingExplanations explanations;

+ 4 - 2
core/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

@@ -91,7 +91,8 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
         final Settings.Builder transientUpdates = Settings.settingsBuilder();
         final Settings.Builder persistentUpdates = Settings.settingsBuilder();
 
-        clusterService.submitStateUpdateTask("cluster_update_settings", Priority.IMMEDIATE, new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(request, listener) {
+        clusterService.submitStateUpdateTask("cluster_update_settings",
+                new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(Priority.IMMEDIATE, request, listener) {
 
             private volatile boolean changed = false;
 
@@ -132,7 +133,8 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
                 // in the components (e.g. FilterAllocationDecider), so the changes made by the first call aren't visible
                 // to the components until the ClusterStateListener instances have been invoked, but are visible after
                 // the first update task has been completed.
-                clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(request, listener) {
+                clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings",
+                        new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(Priority.URGENT, request, listener) {
 
                     @Override
                     public boolean mustAck(DiscoveryNode discoveryNode) {

+ 54 - 0
core/src/main/java/org/elasticsearch/cluster/AckedClusterStateTaskListener.java

@@ -0,0 +1,54 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.unit.TimeValue;
+
+public interface AckedClusterStateTaskListener extends ClusterStateTaskListener {
+
+    /**
+     * Called to determine which nodes the acknowledgement is expected from
+     *
+     * @param discoveryNode a node
+     * @return true if the node is expected to send ack back, false otherwise
+     */
+    boolean mustAck(DiscoveryNode discoveryNode);
+
+    /**
+     * Called once all the nodes have acknowledged the cluster state update request. Must be
+     * very lightweight execution, since it gets executed on the cluster service thread.
+     *
+     * @param t optional error that might have been thrown
+     */
+    void onAllNodesAcked(@Nullable Throwable t);
+
+    /**
+     * Called once the acknowledgement timeout defined by
+     * {@link AckedClusterStateUpdateTask#ackTimeout()} has expired
+     */
+    void onAckTimeout();
+
+    /**
+     * Acknowledgement timeout, maximum time interval to wait for acknowledgements
+     */
+    TimeValue ackTimeout();
+
+}

+ 7 - 1
core/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java

@@ -22,18 +22,24 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ack.AckedRequest;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.unit.TimeValue;
 
 /**
  * An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
  * all the nodes have acknowledged a cluster state update request
  */
-public abstract class AckedClusterStateUpdateTask<Response> extends ClusterStateUpdateTask {
+public abstract class AckedClusterStateUpdateTask<Response> extends ClusterStateUpdateTask implements AckedClusterStateTaskListener {
 
     private final ActionListener<Response> listener;
     private final AckedRequest request;
 
     protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Response> listener) {
+        this(Priority.NORMAL, request, listener);
+    }
+
+    protected AckedClusterStateUpdateTask(Priority priority, AckedRequest request, ActionListener<Response> listener) {
+        super(priority);
         this.listener = listener;
         this.request = request;
     }

+ 29 - 7
core/src/main/java/org/elasticsearch/cluster/ClusterService.java

@@ -24,7 +24,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.OperationRouting;
 import org.elasticsearch.cluster.service.PendingClusterTask;
 import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.component.LifecycleComponent;
 import org.elasticsearch.common.unit.TimeValue;
 
@@ -101,12 +100,35 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
     void add(@Nullable TimeValue timeout, TimeoutClusterStateListener listener);
 
     /**
-     * Submits a task that will update the cluster state.
-     */
-    void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask);
-
-    /**
-     * Submits a task that will update the cluster state (the task has a default priority of {@link Priority#NORMAL}).
+     * Submits a cluster state update task; submitted updates will be
+     * batched across the same instance of executor. The exact batching
+     * semantics depend on the underlying implementation but a rough
+     * guideline is that if the update task is submitted while there
+     * are pending update tasks for the same executor, these update
+     * tasks will all be executed on the executor in a single batch
+     *
+     * @param source   the source of the cluster state update task
+     * @param task     the state needed for the cluster state update task
+     * @param config   the cluster state update task configuration
+     * @param executor the cluster state update task executor; tasks
+     *                 that share the same executor will be executed
+     *                 batches on this executor
+     * @param listener callback after the cluster state update task
+     *                 completes
+     * @param <T>      the type of the cluster state update task state
+     */
+    <T> void submitStateUpdateTask(final String source, final T task,
+                                   final ClusterStateTaskConfig config,
+                                   final ClusterStateTaskExecutor<T> executor,
+                                   final ClusterStateTaskListener listener);
+
+    /**
+     * Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig, ClusterStateTaskExecutor, ClusterStateTaskListener)},
+     * submitted updates will not be batched.
+     *
+     * @param source     the source of the cluster state update task
+     * @param updateTask the full context for the cluster state update
+     *                   task
      */
     void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask);
 

+ 92 - 0
core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskConfig.java

@@ -0,0 +1,92 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.unit.TimeValue;
+
+/**
+ * Cluster state update task configuration for timeout and priority
+ */
+public interface ClusterStateTaskConfig {
+    /**
+     * The timeout for this cluster state update task configuration. If
+     * the cluster state update task isn't processed within this
+     * timeout, the associated {@link ClusterStateTaskListener#onFailure(String, Throwable)}
+     * is invoked.
+     *
+     * @return the timeout, or null if one is not set
+     */
+    @Nullable
+    TimeValue timeout();
+
+    /**
+     * The {@link Priority} for this cluster state update task configuration.
+     *
+     * @return the priority
+     */
+    Priority priority();
+
+    /**
+     * Build a cluster state update task configuration with the
+     * specified {@link Priority} and no timeout.
+     *
+     * @param priority the priority for the associated cluster state
+     *                 update task
+     * @return the resulting cluster state update task configuration
+     */
+    static ClusterStateTaskConfig build(Priority priority) {
+        return new Basic(priority, null);
+    }
+
+    /**
+     * Build a cluster state update task configuration with the
+     * specified {@link Priority} and timeout.
+     *
+     * @param priority the priority for the associated cluster state
+     *                 update task
+     * @param timeout  the timeout for the associated cluster state
+     *                 update task
+     * @return the result cluster state update task configuration
+     */
+    static ClusterStateTaskConfig build(Priority priority, TimeValue timeout) {
+        return new Basic(priority, timeout);
+    }
+
+    class Basic implements ClusterStateTaskConfig {
+        final TimeValue timeout;
+        final Priority priority;
+
+        public Basic(Priority priority, TimeValue timeout) {
+            this.timeout = timeout;
+            this.priority = priority;
+        }
+
+        @Override
+        public TimeValue timeout() {
+            return timeout;
+        }
+
+        @Override
+        public Priority priority() {
+            return priority;
+        }
+    }
+}

+ 132 - 0
core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java

@@ -0,0 +1,132 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster;
+
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+public interface ClusterStateTaskExecutor<T> {
+    /**
+     * Update the cluster state based on the current state and the given tasks. Return the *same instance* if no state
+     * should be changed.
+     */
+    BatchResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;
+
+    /**
+     * indicates whether this task should only run if current node is master
+     */
+    default boolean runOnlyOnMaster() {
+        return true;
+    }
+
+    /**
+     * Represents the result of a batched execution of cluster state update tasks
+     * @param <T> the type of the cluster state update task
+     */
+    class BatchResult<T> {
+        final public ClusterState resultingState;
+        final public Map<T, TaskResult> executionResults;
+
+        /**
+         * Construct an execution result instance with a correspondence between the tasks and their execution result
+         * @param resultingState the resulting cluster state
+         * @param executionResults the correspondence between tasks and their outcome
+         */
+        BatchResult(ClusterState resultingState, Map<T, TaskResult> executionResults) {
+            this.resultingState = resultingState;
+            this.executionResults = executionResults;
+        }
+
+        public static <T> Builder<T> builder() {
+            return new Builder<>();
+        }
+
+        public static class Builder<T> {
+            private final Map<T, TaskResult> executionResults = new IdentityHashMap<>();
+
+            public Builder<T> success(T task) {
+                return result(task, TaskResult.success());
+            }
+
+            public Builder<T> successes(Iterable<T> tasks) {
+                for (T task : tasks) {
+                    success(task);
+                }
+                return this;
+            }
+
+            public Builder<T> failure(T task, Throwable t) {
+                return result(task, TaskResult.failure(t));
+            }
+
+            public Builder<T> failures(Iterable<T> tasks, Throwable t) {
+                for (T task : tasks) {
+                    failure(task, t);
+                }
+                return this;
+            }
+
+            private Builder<T> result(T task, TaskResult executionResult) {
+                executionResults.put(task, executionResult);
+                return this;
+            }
+
+            public BatchResult<T> build(ClusterState resultingState) {
+                return new BatchResult<>(resultingState, executionResults);
+            }
+        }
+    }
+
+    final class TaskResult {
+        private final Throwable failure;
+
+        private static final TaskResult SUCCESS = new TaskResult(null);
+
+        public static TaskResult success() {
+            return SUCCESS;
+        }
+
+        public static TaskResult failure(Throwable failure) {
+            return new TaskResult(failure);
+        }
+
+        private TaskResult(Throwable failure) {
+            this.failure = failure;
+        }
+
+        public boolean isSuccess() {
+            return failure != null;
+        }
+
+        /**
+         * Handle the execution result with the provided consumers
+         * @param onSuccess handler to invoke on success
+         * @param onFailure handler to invoke on failure; the throwable passed through will not be null
+         */
+        public void handle(Runnable onSuccess, Consumer<Throwable> onFailure) {
+            if (failure == null) {
+                onSuccess.run();
+            } else {
+                onFailure.accept(failure);
+            }
+        }
+    }
+}

+ 43 - 0
core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java

@@ -0,0 +1,43 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster;
+
+import java.util.List;
+
+public interface ClusterStateTaskListener {
+
+    /**
+     * A callback called when execute fails.
+     */
+    void onFailure(String source, Throwable t);
+
+    /**
+     * called when the task was rejected because the local node is no longer master
+     */
+    default void onNoLongerMaster(String source) {
+        onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));
+    }
+
+    /**
+     * Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} have been processed
+     * properly by all listeners.
+     */
+    default void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+    }
+}

+ 24 - 25
core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java

@@ -20,46 +20,42 @@
 package org.elasticsearch.cluster;
 
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+import java.util.List;
 
 /**
  * A task that can update the cluster state.
  */
-abstract public class ClusterStateUpdateTask {
+abstract public class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor<ClusterStateUpdateTask>, ClusterStateTaskListener {
 
-    /**
-     * Update the cluster state based on the current state. Return the *same instance* if no state
-     * should be changed.
-     */
-    abstract public ClusterState execute(ClusterState currentState) throws Exception;
+    final private Priority priority;
 
-    /**
-     * A callback called when execute fails.
-     */
-    abstract public void onFailure(String source, Throwable t);
+    public ClusterStateUpdateTask() {
+        this(Priority.NORMAL);
+    }
 
+    public ClusterStateUpdateTask(Priority priority) {
+        this.priority = priority;
+    }
 
-    /**
-     * indicates whether this task should only run if current node is master
-     */
-    public boolean runOnlyOnMaster() {
-        return true;
+    @Override
+    final public BatchResult<ClusterStateUpdateTask> execute(ClusterState currentState, List<ClusterStateUpdateTask> tasks) throws Exception {
+        ClusterState result = execute(currentState);
+        return BatchResult.<ClusterStateUpdateTask>builder().successes(tasks).build(result);
     }
 
     /**
-     * called when the task was rejected because the local node is no longer master
+     * Update the cluster state based on the current state. Return the *same instance* if no state
+     * should be changed.
      */
-    public void onNoLongerMaster(String source) {
-        onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));
-    }
+    abstract public ClusterState execute(ClusterState currentState) throws Exception;
 
     /**
-     * Called when the result of the {@link #execute(ClusterState)} have been processed
-     * properly by all listeners.
+     * A callback called when execute fails.
      */
-    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-    }
+    abstract public void onFailure(String source, Throwable t);
 
     /**
      * If the cluster state update task wasn't processed by the provided timeout, call
@@ -70,5 +66,8 @@ abstract public class ClusterStateUpdateTask {
         return null;
     }
 
-
+    @Override
+    public Priority priority() {
+        return priority;
+    }
 }

+ 8 - 2
core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

@@ -144,7 +144,8 @@ public class ShardStateAction extends AbstractComponent {
     private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
         logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
         failedShardQueue.add(shardRoutingEntry);
-        clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", Priority.HIGH, new ClusterStateUpdateTask() {
+        clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
+                new ClusterStateUpdateTask(Priority.HIGH) {
 
             @Override
             public ClusterState execute(ClusterState currentState) {
@@ -198,8 +199,13 @@ public class ShardStateAction extends AbstractComponent {
         // process started events as fast as possible, to make shards available
         startedShardsQueue.add(shardRoutingEntry);
 
-        clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", Priority.URGENT,
+        clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
                 new ClusterStateUpdateTask() {
+                    @Override
+                    public Priority priority() {
+                        return Priority.URGENT;
+                    }
+
                     @Override
                     public ClusterState execute(ClusterState currentState) {
 

+ 6 - 6
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

@@ -170,12 +170,12 @@ public class MetaDataCreateIndexService extends AbstractComponent {
         updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
         request.settings(updatedSettingsBuilder.build());
 
-        clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
-
-            @Override
-            protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
-                return new ClusterStateUpdateResponse(acknowledged);
-            }
+        clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
+                new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
+                    @Override
+                    protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
+                        return new ClusterStateUpdateResponse(acknowledged);
+                    }
 
             @Override
             public ClusterState execute(ClusterState currentState) throws Exception {

+ 1 - 2
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java

@@ -39,7 +39,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Locale;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -71,7 +70,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
         Collection<String> indices = Arrays.asList(request.indices);
         final DeleteIndexListener listener = new DeleteIndexListener(userListener);
 
-        clusterService.submitStateUpdateTask("delete-index " + indices, Priority.URGENT, new ClusterStateUpdateTask() {
+        clusterService.submitStateUpdateTask("delete-index " + indices, new ClusterStateUpdateTask(Priority.URGENT) {
 
             @Override
             public TimeValue timeout() {

+ 1 - 1
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java

@@ -62,7 +62,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
     }
 
     public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
-        clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
+        clusterService.submitStateUpdateTask("index-aliases", new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
             @Override
             protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
                 return new ClusterStateUpdateResponse(acknowledged);

+ 2 - 2
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

@@ -76,7 +76,7 @@ public class MetaDataIndexStateService extends AbstractComponent {
         }
 
         final String indicesAsString = Arrays.toString(request.indices());
-        clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
+        clusterService.submitStateUpdateTask("close-indices " + indicesAsString, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
             @Override
             protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
                 return new ClusterStateUpdateResponse(acknowledged);
@@ -140,7 +140,7 @@ public class MetaDataIndexStateService extends AbstractComponent {
         }
 
         final String indicesAsString = Arrays.toString(request.indices());
-        clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
+        clusterService.submitStateUpdateTask("open-indices " + indicesAsString, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
             @Override
             protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
                 return new ClusterStateUpdateResponse(acknowledged);

+ 3 - 2
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java

@@ -56,7 +56,7 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
     }
 
     public void removeTemplates(final RemoveRequest request, final RemoveListener listener) {
-        clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", Priority.URGENT, new ClusterStateUpdateTask() {
+        clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", new ClusterStateUpdateTask(Priority.URGENT) {
 
             @Override
             public TimeValue timeout() {
@@ -143,7 +143,8 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
         }
         final IndexTemplateMetaData template = templateBuilder.build();
 
-        clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", Priority.URGENT, new ClusterStateUpdateTask() {
+        clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]",
+                new ClusterStateUpdateTask(Priority.URGENT) {
 
             @Override
             public TimeValue timeout() {

+ 222 - 298
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java

@@ -22,17 +22,16 @@ package org.elasticsearch.cluster.metadata;
 import com.carrotsearch.hppc.cursors.ObjectCursor;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
-import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
-import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.*;
 import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.NodeServicesProvider;
@@ -44,6 +43,7 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.InvalidTypeNameException;
 import org.elasticsearch.percolator.PercolatorService;
 
+import java.io.IOException;
 import java.util.*;
 /**
  * Service responsible for submitting mapping changes
@@ -53,13 +53,11 @@ public class MetaDataMappingService extends AbstractComponent {
     private final ClusterService clusterService;
     private final IndicesService indicesService;
 
-    // the mutex protect all the refreshOrUpdate variables!
-    private final Object refreshOrUpdateMutex = new Object();
-    private final List<MappingTask> refreshOrUpdateQueue = new ArrayList<>();
-    private long refreshOrUpdateInsertOrder;
-    private long refreshOrUpdateProcessedInsertOrder;
+    final ClusterStateTaskExecutor<RefreshTask> refreshExectuor = new RefreshTaskExecutor();
+    final ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> putMappingExecutor = new PutMappingExecutor();
     private final NodeServicesProvider nodeServicesProvider;
 
+
     @Inject
     public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeServicesProvider nodeServicesProvider) {
         super(settings);
@@ -68,37 +66,23 @@ public class MetaDataMappingService extends AbstractComponent {
         this.nodeServicesProvider = nodeServicesProvider;
     }
 
-    static class MappingTask {
+    static class RefreshTask {
         final String index;
         final String indexUUID;
-
-        MappingTask(String index, final String indexUUID) {
-            this.index = index;
-            this.indexUUID = indexUUID;
-        }
-    }
-
-    static class RefreshTask extends MappingTask {
         final String[] types;
 
         RefreshTask(String index, final String indexUUID, String[] types) {
-            super(index, indexUUID);
+            this.index = index;
+            this.indexUUID = indexUUID;
             this.types = types;
         }
     }
 
-    static class UpdateTask extends MappingTask {
-        final String type;
-        final CompressedXContent mappingSource;
-        final String nodeId; // null fr unknown
-        final ActionListener<ClusterStateUpdateResponse> listener;
-
-        UpdateTask(String index, String indexUUID, String type, CompressedXContent mappingSource, String nodeId, ActionListener<ClusterStateUpdateResponse> listener) {
-            super(index, indexUUID);
-            this.type = type;
-            this.mappingSource = mappingSource;
-            this.nodeId = nodeId;
-            this.listener = listener;
+    class RefreshTaskExecutor implements ClusterStateTaskExecutor<RefreshTask> {
+        @Override
+        public BatchResult<RefreshTask> execute(ClusterState currentState, List<RefreshTask> tasks) throws Exception {
+            ClusterState newClusterState = executeRefresh(currentState, tasks);
+            return BatchResult.<RefreshTask>builder().successes(tasks).build(newClusterState);
         }
     }
 
@@ -107,50 +91,25 @@ public class MetaDataMappingService extends AbstractComponent {
      * as possible so we won't create the same index all the time for example for the updates on the same mapping
      * and generate a single cluster change event out of all of those.
      */
-    Tuple<ClusterState, List<MappingTask>> executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception {
-        final List<MappingTask> allTasks = new ArrayList<>();
-
-        synchronized (refreshOrUpdateMutex) {
-            if (refreshOrUpdateQueue.isEmpty()) {
-                return Tuple.tuple(currentState, allTasks);
-            }
-
-            // we already processed this task in a bulk manner in a previous cluster event, simply ignore
-            // it so we will let other tasks get in and processed ones, we will handle the queued ones
-            // later on in a subsequent cluster state event
-            if (insertionOrder < refreshOrUpdateProcessedInsertOrder) {
-                return Tuple.tuple(currentState, allTasks);
-            }
-
-            allTasks.addAll(refreshOrUpdateQueue);
-            refreshOrUpdateQueue.clear();
-
-            refreshOrUpdateProcessedInsertOrder = refreshOrUpdateInsertOrder;
-        }
-
+    ClusterState executeRefresh(final ClusterState currentState, final List<RefreshTask> allTasks) throws Exception {
         if (allTasks.isEmpty()) {
-            return Tuple.tuple(currentState, allTasks);
+            return currentState;
         }
 
         // break down to tasks per index, so we can optimize the on demand index service creation
         // to only happen for the duration of a single index processing of its respective events
-        Map<String, List<MappingTask>> tasksPerIndex = new HashMap<>();
-        for (MappingTask task : allTasks) {
+        Map<String, List<RefreshTask>> tasksPerIndex = new HashMap<>();
+        for (RefreshTask task : allTasks) {
             if (task.index == null) {
                 logger.debug("ignoring a mapping task of type [{}] with a null index.", task);
             }
-            List<MappingTask> indexTasks = tasksPerIndex.get(task.index);
-            if (indexTasks == null) {
-                indexTasks = new ArrayList<>();
-                tasksPerIndex.put(task.index, indexTasks);
-            }
-            indexTasks.add(task);
+            tasksPerIndex.computeIfAbsent(task.index, k -> new ArrayList<>()).add(task);
         }
 
         boolean dirty = false;
         MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
 
-        for (Map.Entry<String, List<MappingTask>> entry : tasksPerIndex.entrySet()) {
+        for (Map.Entry<String, List<RefreshTask>> entry : tasksPerIndex.entrySet()) {
             String index = entry.getKey();
             IndexMetaData indexMetaData = mdBuilder.get(index);
             if (indexMetaData == null) {
@@ -160,9 +119,9 @@ public class MetaDataMappingService extends AbstractComponent {
             }
             // the tasks lists to iterate over, filled with the list of mapping tasks, trying to keep
             // the latest (based on order) update mapping one per node
-            List<MappingTask> allIndexTasks = entry.getValue();
-            List<MappingTask> tasks = new ArrayList<>();
-            for (MappingTask task : allIndexTasks) {
+            List<RefreshTask> allIndexTasks = entry.getValue();
+            List<RefreshTask> tasks = new ArrayList<>();
+            for (RefreshTask task : allIndexTasks) {
                 if (!indexMetaData.isSameUUID(task.indexUUID)) {
                     logger.debug("[{}] ignoring task [{}] - index meta data doesn't match task uuid", index, task);
                     continue;
@@ -178,12 +137,8 @@ public class MetaDataMappingService extends AbstractComponent {
                 indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST);
                 removeIndex = true;
                 Set<String> typesToIntroduce = new HashSet<>();
-                for (MappingTask task : tasks) {
-                    if (task instanceof UpdateTask) {
-                        typesToIntroduce.add(((UpdateTask) task).type);
-                    } else if (task instanceof RefreshTask) {
-                        Collections.addAll(typesToIntroduce, ((RefreshTask) task).types);
-                    }
+                for (RefreshTask task : tasks) {
+                    Collections.addAll(typesToIntroduce, task.types);
                 }
                 for (String type : typesToIntroduce) {
                     // only add the current relevant mapping (if exists)
@@ -209,80 +164,42 @@ public class MetaDataMappingService extends AbstractComponent {
         }
 
         if (!dirty) {
-            return Tuple.tuple(currentState, allTasks);
+            return currentState;
         }
-        return Tuple.tuple(ClusterState.builder(currentState).metaData(mdBuilder).build(), allTasks);
+        return ClusterState.builder(currentState).metaData(mdBuilder).build();
     }
 
-    private boolean processIndexMappingTasks(List<MappingTask> tasks, IndexService indexService, IndexMetaData.Builder builder) {
+    private boolean processIndexMappingTasks(List<RefreshTask> tasks, IndexService indexService, IndexMetaData.Builder builder) {
         boolean dirty = false;
         String index = indexService.index().name();
         // keep track of what we already refreshed, no need to refresh it again...
         Set<String> processedRefreshes = new HashSet<>();
-        for (MappingTask task : tasks) {
-            if (task instanceof RefreshTask) {
-                RefreshTask refreshTask = (RefreshTask) task;
-                try {
-                    List<String> updatedTypes = new ArrayList<>();
-                    for (String type : refreshTask.types) {
-                        if (processedRefreshes.contains(type)) {
-                            continue;
-                        }
-                        DocumentMapper mapper = indexService.mapperService().documentMapper(type);
-                        if (mapper == null) {
-                            continue;
-                        }
-                        if (!mapper.mappingSource().equals(builder.mapping(type).source())) {
-                            updatedTypes.add(type);
-                            builder.putMapping(new MappingMetaData(mapper));
-                        }
-                        processedRefreshes.add(type);
-                    }
-
-                    if (updatedTypes.isEmpty()) {
-                        continue;
-                    }
-
-                    logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
-                    dirty = true;
-                } catch (Throwable t) {
-                    logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types);
-                }
-            } else if (task instanceof UpdateTask) {
-                UpdateTask updateTask = (UpdateTask) task;
-                try {
-                    String type = updateTask.type;
-                    CompressedXContent mappingSource = updateTask.mappingSource;
-
-                    MappingMetaData mappingMetaData = builder.mapping(type);
-                    if (mappingMetaData != null && mappingMetaData.source().equals(mappingSource)) {
-                        logger.debug("[{}] update_mapping [{}] ignoring mapping update task as its source is equal to ours", index, updateTask.type);
+        for (RefreshTask refreshTask : tasks) {
+            try {
+                List<String> updatedTypes = new ArrayList<>();
+                for (String type : refreshTask.types) {
+                    if (processedRefreshes.contains(type)) {
                         continue;
                     }
-
-                    DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource, false, true);
-                    processedRefreshes.add(type);
-
-                    // if we end up with the same mapping as the original once, ignore
-                    if (mappingMetaData != null && mappingMetaData.source().equals(updatedMapper.mappingSource())) {
-                        logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it results in the same source as what we have", index, updateTask.type);
+                    DocumentMapper mapper = indexService.mapperService().documentMapper(type);
+                    if (mapper == null) {
                         continue;
                     }
-
-                    // build the updated mapping source
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource());
-                    } else if (logger.isInfoEnabled()) {
-                        logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
+                    if (!mapper.mappingSource().equals(builder.mapping(type).source())) {
+                        updatedTypes.add(type);
+                        builder.putMapping(new MappingMetaData(mapper));
                     }
+                    processedRefreshes.add(type);
+                }
 
-                    builder.putMapping(new MappingMetaData(updatedMapper));
-                    dirty = true;
-                } catch (Throwable t) {
-                    logger.warn("[{}] failed to update-mapping in cluster state, type [{}]", index, updateTask.type);
+                if (updatedTypes.isEmpty()) {
+                    continue;
                 }
-            } else {
-                logger.warn("illegal state, got wrong mapping task type [{}]", task);
+
+                logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
+                dirty = true;
+            } catch (Throwable t) {
+                logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types);
             }
         }
         return dirty;
@@ -292,197 +209,204 @@ public class MetaDataMappingService extends AbstractComponent {
      * Refreshes mappings if they are not the same between original and parsed version
      */
     public void refreshMapping(final String index, final String indexUUID, final String... types) {
-        final long insertOrder;
-        synchronized (refreshOrUpdateMutex) {
-            insertOrder = ++refreshOrUpdateInsertOrder;
-            refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types));
-        }
-        clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() {
-            private volatile List<MappingTask> allTasks;
-
-            @Override
-            public void onFailure(String source, Throwable t) {
-                logger.warn("failure during [{}]", t, source);
-            }
-
-            @Override
-            public ClusterState execute(ClusterState currentState) throws Exception {
-                Tuple<ClusterState, List<MappingTask>> tuple = executeRefreshOrUpdate(currentState, insertOrder);
-                this.allTasks = tuple.v2();
-                return tuple.v1();
-            }
+        final RefreshTask refreshTask = new RefreshTask(index, indexUUID, types);
+        clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]",
+                refreshTask,
+                ClusterStateTaskConfig.build(Priority.HIGH),
+                refreshExectuor,
+                (source, t) -> logger.warn("failure during [{}]", t, source)
+        );
+    }
 
-            @Override
-            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                if (allTasks == null) {
-                    return;
+    class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> {
+        @Override
+        public BatchResult<PutMappingClusterStateUpdateRequest> execute(ClusterState currentState, List<PutMappingClusterStateUpdateRequest> tasks) throws Exception {
+            List<String> indicesToClose = new ArrayList<>();
+            BatchResult.Builder<PutMappingClusterStateUpdateRequest> builder = BatchResult.builder();
+            Map<PutMappingClusterStateUpdateRequest, TaskResult> executionResults = new HashMap<>();
+            try {
+                // precreate incoming indices;
+                for (PutMappingClusterStateUpdateRequest request : tasks) {
+                    // failures here mean something is broken with our cluster state - fail all tasks by letting exceptions bubble up
+                    for (String index : request.indices()) {
+                        if (currentState.metaData().hasIndex(index)) {
+                            // if we don't have the index, we will throw exceptions later;
+                            if (indicesService.hasIndex(index) == false) {
+                                final IndexMetaData indexMetaData = currentState.metaData().index(index);
+                                IndexService indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST);
+                                indicesToClose.add(indexMetaData.getIndex());
+                                // make sure to add custom default mapping if exists
+                                if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) {
+                                    indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, request.updateAllTypes());
+                                }
+                                // only add the current relevant mapping (if exists)
+                                if (indexMetaData.getMappings().containsKey(request.type())) {
+                                    indexService.mapperService().merge(request.type(), indexMetaData.getMappings().get(request.type()).source(), false, request.updateAllTypes());
+                                }
+                            }
+                        }
+                    }
                 }
-                for (Object task : allTasks) {
-                    if (task instanceof UpdateTask) {
-                        UpdateTask uTask = (UpdateTask) task;
-                        ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
-                        uTask.listener.onResponse(response);
+                for (PutMappingClusterStateUpdateRequest request : tasks) {
+                    try {
+                        currentState = applyRequest(currentState, request);
+                        builder.success(request);
+                    } catch (Throwable t) {
+                        builder.failure(request, t);
                     }
                 }
-            }
-        });
-    }
-
-    public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
 
-        clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
-
-            @Override
-            protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
-                return new ClusterStateUpdateResponse(acknowledged);
+                return builder.build(currentState);
+            } finally {
+                for (String index : indicesToClose) {
+                    indicesService.removeIndex(index, "created for mapping processing");
+                }
             }
+        }
 
-            @Override
-            public ClusterState execute(final ClusterState currentState) throws Exception {
-                List<String> indicesToClose = new ArrayList<>();
-                try {
-                    for (String index : request.indices()) {
-                        if (!currentState.metaData().hasIndex(index)) {
-                            throw new IndexNotFoundException(index);
-                        }
-                    }
-
-                    // pre create indices here and add mappings to them so we can merge the mappings here if needed
-                    for (String index : request.indices()) {
-                        if (indicesService.hasIndex(index)) {
-                            continue;
+        private ClusterState applyRequest(ClusterState currentState, PutMappingClusterStateUpdateRequest request) throws IOException {
+            Map<String, DocumentMapper> newMappers = new HashMap<>();
+            Map<String, DocumentMapper> existingMappers = new HashMap<>();
+            for (String index : request.indices()) {
+                IndexService indexService = indicesService.indexServiceSafe(index);
+                // try and parse it (no need to add it here) so we can bail early in case of parsing exception
+                DocumentMapper newMapper;
+                DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type());
+                if (MapperService.DEFAULT_MAPPING.equals(request.type())) {
+                    // _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default
+                    newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), false);
+                } else {
+                    newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), existingMapper == null);
+                    if (existingMapper != null) {
+                        // first, simulate
+                        MergeResult mergeResult = existingMapper.merge(newMapper.mapping(), true, request.updateAllTypes());
+                        // if we have conflicts, throw an exception
+                        if (mergeResult.hasConflicts()) {
+                            throw new MergeMappingException(mergeResult.buildConflicts());
                         }
-                        final IndexMetaData indexMetaData = currentState.metaData().index(index);
-                        IndexService indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST);
-                        indicesToClose.add(indexMetaData.getIndex());
-                        // make sure to add custom default mapping if exists
-                        if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) {
-                            indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, request.updateAllTypes());
-                        }
-                        // only add the current relevant mapping (if exists)
-                        if (indexMetaData.getMappings().containsKey(request.type())) {
-                            indexService.mapperService().merge(request.type(), indexMetaData.getMappings().get(request.type()).source(), false, request.updateAllTypes());
-                        }
-                    }
-
-                    Map<String, DocumentMapper> newMappers = new HashMap<>();
-                    Map<String, DocumentMapper> existingMappers = new HashMap<>();
-                    for (String index : request.indices()) {
-                        IndexService indexService = indicesService.indexServiceSafe(index);
-                        // try and parse it (no need to add it here) so we can bail early in case of parsing exception
-                        DocumentMapper newMapper;
-                        DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type());
-                        if (MapperService.DEFAULT_MAPPING.equals(request.type())) {
-                            // _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default
-                            newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), false);
-                        } else {
-                            newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), existingMapper == null);
-                            if (existingMapper != null) {
-                                // first, simulate
-                                MergeResult mergeResult = existingMapper.merge(newMapper.mapping(), true, request.updateAllTypes());
-                                // if we have conflicts, throw an exception
-                                if (mergeResult.hasConflicts()) {
-                                    throw new MergeMappingException(mergeResult.buildConflicts());
-                                }
-                            } else {
-                                // TODO: can we find a better place for this validation?
-                                // The reason this validation is here is that the mapper service doesn't learn about
-                                // new types all at once , which can create a false error.
-
-                                // For example in MapperService we can't distinguish between a create index api call
-                                // and a put mapping api call, so we don't which type did exist before.
-                                // Also the order of the mappings may be backwards.
-                                if (newMapper.parentFieldMapper().active()) {
-                                    IndexMetaData indexMetaData = currentState.metaData().index(index);
-                                    for (ObjectCursor<MappingMetaData> mapping : indexMetaData.getMappings().values()) {
-                                        if (newMapper.parentFieldMapper().type().equals(mapping.value.type())) {
-                                            throw new IllegalArgumentException("can't add a _parent field that points to an already existing type");
-                                        }
-                                    }
+                    } else {
+                        // TODO: can we find a better place for this validation?
+                        // The reason this validation is here is that the mapper service doesn't learn about
+                        // new types all at once , which can create a false error.
+
+                        // For example in MapperService we can't distinguish between a create index api call
+                        // and a put mapping api call, so we don't which type did exist before.
+                        // Also the order of the mappings may be backwards.
+                        if (newMapper.parentFieldMapper().active()) {
+                            IndexMetaData indexMetaData = currentState.metaData().index(index);
+                            for (ObjectCursor<MappingMetaData> mapping : indexMetaData.getMappings().values()) {
+                                if (newMapper.parentFieldMapper().type().equals(mapping.value.type())) {
+                                    throw new IllegalArgumentException("can't add a _parent field that points to an already existing type");
                                 }
                             }
                         }
+                    }
+                }
+                newMappers.put(index, newMapper);
+                if (existingMapper != null) {
+                    existingMappers.put(index, existingMapper);
+                }
+            }
 
+            String mappingType = request.type();
+            if (mappingType == null) {
+                mappingType = newMappers.values().iterator().next().type();
+            } else if (!mappingType.equals(newMappers.values().iterator().next().type())) {
+                throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition");
+            }
+            if (!MapperService.DEFAULT_MAPPING.equals(mappingType) && !PercolatorService.TYPE_NAME.equals(mappingType) && mappingType.charAt(0) == '_') {
+                throw new InvalidTypeNameException("Document mapping type name can't start with '_'");
+            }
+            final Map<String, MappingMetaData> mappings = new HashMap<>();
+            for (Map.Entry<String, DocumentMapper> entry : newMappers.entrySet()) {
+                String index = entry.getKey();
+                // do the actual merge here on the master, and update the mapping source
+                DocumentMapper newMapper = entry.getValue();
+                IndexService indexService = indicesService.indexService(index);
+                if (indexService == null) {
+                    continue;
+                }
 
-                        newMappers.put(index, newMapper);
-                        if (existingMapper != null) {
-                            existingMappers.put(index, existingMapper);
+                CompressedXContent existingSource = null;
+                if (existingMappers.containsKey(entry.getKey())) {
+                    existingSource = existingMappers.get(entry.getKey()).mappingSource();
+                }
+                DocumentMapper mergedMapper = indexService.mapperService().merge(newMapper.type(), newMapper.mappingSource(), false, request.updateAllTypes());
+                CompressedXContent updatedSource = mergedMapper.mappingSource();
+
+                if (existingSource != null) {
+                    if (existingSource.equals(updatedSource)) {
+                        // same source, no changes, ignore it
+                    } else {
+                        // use the merged mapping source
+                        mappings.put(index, new MappingMetaData(mergedMapper));
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("[{}] update_mapping [{}] with source [{}]", index, mergedMapper.type(), updatedSource);
+                        } else if (logger.isInfoEnabled()) {
+                            logger.info("[{}] update_mapping [{}]", index, mergedMapper.type());
                         }
-                    }
 
-                    String mappingType = request.type();
-                    if (mappingType == null) {
-                        mappingType = newMappers.values().iterator().next().type();
-                    } else if (!mappingType.equals(newMappers.values().iterator().next().type())) {
-                        throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition");
                     }
-                    if (!MapperService.DEFAULT_MAPPING.equals(mappingType) && !PercolatorService.TYPE_NAME.equals(mappingType) && mappingType.charAt(0) == '_') {
-                        throw new InvalidTypeNameException("Document mapping type name can't start with '_'");
+                } else {
+                    mappings.put(index, new MappingMetaData(mergedMapper));
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("[{}] create_mapping [{}] with source [{}]", index, newMapper.type(), updatedSource);
+                    } else if (logger.isInfoEnabled()) {
+                        logger.info("[{}] create_mapping [{}]", index, newMapper.type());
                     }
+                }
+            }
+            if (mappings.isEmpty()) {
+                // no changes, return
+                return currentState;
+            }
+            MetaData.Builder builder = MetaData.builder(currentState.metaData());
+            for (String indexName : request.indices()) {
+                IndexMetaData indexMetaData = currentState.metaData().index(indexName);
+                if (indexMetaData == null) {
+                    throw new IndexNotFoundException(indexName);
+                }
+                MappingMetaData mappingMd = mappings.get(indexName);
+                if (mappingMd != null) {
+                    builder.put(IndexMetaData.builder(indexMetaData).putMapping(mappingMd));
+                }
+            }
 
-                    final Map<String, MappingMetaData> mappings = new HashMap<>();
-                    for (Map.Entry<String, DocumentMapper> entry : newMappers.entrySet()) {
-                        String index = entry.getKey();
-                        // do the actual merge here on the master, and update the mapping source
-                        DocumentMapper newMapper = entry.getValue();
-                        IndexService indexService = indicesService.indexService(index);
-                        if (indexService == null) {
-                            continue;
-                        }
+            return ClusterState.builder(currentState).metaData(builder).build();
+        }
+    }
 
-                        CompressedXContent existingSource = null;
-                        if (existingMappers.containsKey(entry.getKey())) {
-                            existingSource = existingMappers.get(entry.getKey()).mappingSource();
-                        }
-                        DocumentMapper mergedMapper = indexService.mapperService().merge(newMapper.type(), newMapper.mappingSource(), false, request.updateAllTypes());
-                        CompressedXContent updatedSource = mergedMapper.mappingSource();
-
-                        if (existingSource != null) {
-                            if (existingSource.equals(updatedSource)) {
-                                // same source, no changes, ignore it
-                            } else {
-                                // use the merged mapping source
-                                mappings.put(index, new MappingMetaData(mergedMapper));
-                                if (logger.isDebugEnabled()) {
-                                    logger.debug("[{}] update_mapping [{}] with source [{}]", index, mergedMapper.type(), updatedSource);
-                                } else if (logger.isInfoEnabled()) {
-                                    logger.info("[{}] update_mapping [{}]", index, mergedMapper.type());
-                                }
-                            }
-                        } else {
-                            mappings.put(index, new MappingMetaData(mergedMapper));
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("[{}] create_mapping [{}] with source [{}]", index, newMapper.type(), updatedSource);
-                            } else if (logger.isInfoEnabled()) {
-                                logger.info("[{}] create_mapping [{}]", index, newMapper.type());
-                            }
-                        }
+    public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
+        clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]",
+                request,
+                ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
+                putMappingExecutor,
+                new AckedClusterStateTaskListener() {
+
+                    @Override
+                    public void onFailure(String source, Throwable t) {
+                        listener.onFailure(t);
                     }
 
-                    if (mappings.isEmpty()) {
-                        // no changes, return
-                        return currentState;
+                    @Override
+                    public boolean mustAck(DiscoveryNode discoveryNode) {
+                        return true;
                     }
 
-                    MetaData.Builder builder = MetaData.builder(currentState.metaData());
-                    for (String indexName : request.indices()) {
-                        IndexMetaData indexMetaData = currentState.metaData().index(indexName);
-                        if (indexMetaData == null) {
-                            throw new IndexNotFoundException(indexName);
-                        }
-                        MappingMetaData mappingMd = mappings.get(indexName);
-                        if (mappingMd != null) {
-                            builder.put(IndexMetaData.builder(indexMetaData).putMapping(mappingMd));
-                        }
+                    @Override
+                    public void onAllNodesAcked(@Nullable Throwable t) {
+                        listener.onResponse(new ClusterStateUpdateResponse(true));
                     }
 
-                    return ClusterState.builder(currentState).metaData(builder).build();
-                } finally {
-                    for (String index : indicesToClose) {
-                        indicesService.removeIndex(index, "created for mapping processing");
+                    @Override
+                    public void onAckTimeout() {
+                        listener.onResponse(new ClusterStateUpdateResponse(false));
                     }
-                }
-            }
-        });
+
+                    @Override
+                    public TimeValue ackTimeout() {
+                        return request.ackTimeout();
+                    }
+                });
     }
 }

+ 5 - 14
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java

@@ -24,11 +24,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsClusterStateUpdateRequest;
 import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
-import org.elasticsearch.cluster.ClusterChangedEvent;
-import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.*;
 import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
 import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.routing.RoutingTable;
@@ -44,13 +40,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.settings.IndexDynamicSettings;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import static org.elasticsearch.common.settings.Settings.settingsBuilder;
 
@@ -219,7 +209,8 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
         }
         final Settings openSettings = updatedSettingsBuilder.build();
 
-        clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
+        clusterService.submitStateUpdateTask("update-settings",
+                new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
 
             @Override
             protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
@@ -334,7 +325,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
     public void upgradeIndexSettings(final UpgradeSettingsClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
 
 
-        clusterService.submitStateUpdateTask("update-index-compatibility-versions", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
+        clusterService.submitStateUpdateTask("update-index-compatibility-versions", new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
 
             @Override
             protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {

+ 1 - 1
core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

@@ -147,7 +147,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
                 return;
             }
             logger.trace("rerouting {}", reason);
-            clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() {
+            clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", new ClusterStateUpdateTask(Priority.HIGH) {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
                     rerouting.set(false);

+ 241 - 198
core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java

@@ -20,16 +20,8 @@
 package org.elasticsearch.cluster.service;
 
 import org.elasticsearch.Version;
-import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
-import org.elasticsearch.cluster.ClusterChangedEvent;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.*;
 import org.elasticsearch.cluster.ClusterState.Builder;
-import org.elasticsearch.cluster.ClusterStateListener;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
-import org.elasticsearch.cluster.LocalNodeMasterListener;
-import org.elasticsearch.cluster.TimeoutClusterStateListener;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.metadata.MetaData;
@@ -41,6 +33,7 @@ import org.elasticsearch.cluster.routing.OperationRouting;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.logging.ESLogger;
@@ -49,13 +42,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.text.StringText;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.common.util.concurrent.CountDown;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
-import org.elasticsearch.common.util.concurrent.FutureUtils;
-import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
-import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
+import org.elasticsearch.common.util.concurrent.*;
 import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.discovery.Discovery;
 import org.elasticsearch.discovery.DiscoveryService;
@@ -63,18 +50,10 @@ import org.elasticsearch.node.settings.NodeSettingsService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
 
@@ -111,6 +90,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
     private final Collection<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<>();
     private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
     private final Collection<ClusterStateListener> lastClusterStateListeners = new CopyOnWriteArrayList<>();
+    private final Map<ClusterStateTaskExecutor, List<UpdateTask>> updateTasksPerExecutor = new HashMap<>();
     // TODO this is rather frequently changing I guess a Synced Set would be better here and a dedicated remove API
     private final Collection<ClusterStateListener> postAppliedListeners = new CopyOnWriteArrayList<>();
     private final Iterable<ClusterStateListener> preAppliedListeners = Iterables.concat(priorityClusterStateListeners, clusterStateListeners, lastClusterStateListeners);
@@ -289,30 +269,33 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
 
     @Override
     public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) {
-        submitStateUpdateTask(source, Priority.NORMAL, updateTask);
+        submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);
     }
 
+
     @Override
-    public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) {
+    public <T> void submitStateUpdateTask(final String source, final T task,
+                                          final ClusterStateTaskConfig config,
+                                          final ClusterStateTaskExecutor<T> executor,
+                                          final ClusterStateTaskListener listener
+    ) {
         if (!lifecycle.started()) {
             return;
         }
         try {
-            final UpdateTask task = new UpdateTask(source, priority, updateTask);
-            if (updateTask.timeout() != null) {
-                updateTasksExecutor.execute(task, threadPool.scheduler(), updateTask.timeout(), new Runnable() {
-                    @Override
-                    public void run() {
-                        threadPool.generic().execute(new Runnable() {
-                            @Override
-                            public void run() {
-                                updateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(updateTask.timeout(), task.source()));
-                            }
-                        });
-                    }
-                });
+            final UpdateTask<T> updateTask = new UpdateTask<>(source, task, config, executor, listener);
+
+            synchronized (updateTasksPerExecutor) {
+                updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>()).add(updateTask);
+            }
+
+            if (config.timeout() != null) {
+                updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> {
+                    if (updateTask.processed.getAndSet(true) == false) {
+                        listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source));
+                    }}));
             } else {
-                updateTasksExecutor.execute(task);
+                updateTasksExecutor.execute(updateTask);
             }
         } catch (EsRejectedExecutionException e) {
             // ignore cases where we are shutting down..., there is really nothing interesting
@@ -379,188 +362,238 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
         }
     }
 
-    class UpdateTask extends SourcePrioritizedRunnable {
+    <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
+        final ArrayList<UpdateTask<T>> toExecute = new ArrayList<>();
+        final ArrayList<String> sources = new ArrayList<>();
+        synchronized (updateTasksPerExecutor) {
+            List<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
+            if (pending != null) {
+                for (UpdateTask<T> task : pending) {
+                    if (task.processed.getAndSet(true) == false) {
+                        logger.trace("will process [{}]", task.source);
+                        toExecute.add(task);
+                        sources.add(task.source);
+                    } else {
+                        logger.trace("skipping [{}], already processed", task.source);
+                    }
+                }
+            }
+        }
+        if (toExecute.isEmpty()) {
+            return;
+        }
+        final String source = Strings.collectionToCommaDelimitedString(sources);
+        if (!lifecycle.started()) {
+            logger.debug("processing [{}]: ignoring, cluster_service not started", source);
+            return;
+        }
+        logger.debug("processing [{}]: execute", source);
+        ClusterState previousClusterState = clusterState;
+        if (!previousClusterState.nodes().localNodeMaster() && executor.runOnlyOnMaster()) {
+            logger.debug("failing [{}]: local node is no longer master", source);
+            toExecute.stream().forEach(task -> task.listener.onNoLongerMaster(task.source));
+            return;
+        }
+        ClusterStateTaskExecutor.BatchResult<T> batchResult;
+        long startTimeNS = System.nanoTime();
+        try {
+            List<T> inputs = toExecute.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
+            batchResult = executor.execute(previousClusterState, inputs);
+        } catch (Throwable e) {
+            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
+            if (logger.isTraceEnabled()) {
+                StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
+                sb.append(previousClusterState.nodes().prettyPrint());
+                sb.append(previousClusterState.routingTable().prettyPrint());
+                sb.append(previousClusterState.getRoutingNodes().prettyPrint());
+                logger.trace(sb.toString(), e);
+            }
+            warnAboutSlowTaskIfNeeded(executionTime, source);
+            batchResult = ClusterStateTaskExecutor.BatchResult.<T>builder().failures(toExecute.stream().map(updateTask -> updateTask.task)::iterator, e).build(previousClusterState);
+        }
 
-        public final ClusterStateUpdateTask updateTask;
+        assert batchResult.executionResults != null;
 
-        UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
-            super(priority, source);
-            this.updateTask = updateTask;
+        ClusterState newClusterState = batchResult.resultingState;
+        final ArrayList<UpdateTask<T>> proccessedListeners = new ArrayList<>();
+        // fail all tasks that have failed and extract those that are waiting for results
+        for (UpdateTask<T> updateTask : toExecute) {
+            assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask.task.toString();
+            final ClusterStateTaskExecutor.TaskResult executionResult =
+                    batchResult.executionResults.get(updateTask.task);
+            executionResult.handle(() -> proccessedListeners.add(updateTask), ex -> updateTask.listener.onFailure(updateTask.source, ex));
         }
 
-        @Override
-        public void run() {
-            if (!lifecycle.started()) {
-                logger.debug("processing [{}]: ignoring, cluster_service not started", source);
-                return;
-            }
-            logger.debug("processing [{}]: execute", source);
-            ClusterState previousClusterState = clusterState;
-            if (!previousClusterState.nodes().localNodeMaster() && updateTask.runOnlyOnMaster()) {
-                logger.debug("failing [{}]: local node is no longer master", source);
-                updateTask.onNoLongerMaster(source);
-                return;
-            }
-            ClusterState newClusterState;
-            long startTimeNS = System.nanoTime();
-            try {
-                newClusterState = updateTask.execute(previousClusterState);
-            } catch (Throwable e) {
-                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
-                if (logger.isTraceEnabled()) {
-                    StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
-                    sb.append(previousClusterState.nodes().prettyPrint());
-                    sb.append(previousClusterState.routingTable().prettyPrint());
-                    sb.append(previousClusterState.getRoutingNodes().prettyPrint());
-                    logger.trace(sb.toString(), e);
-                }
-                warnAboutSlowTaskIfNeeded(executionTime, source);
-                updateTask.onFailure(source, e);
-                return;
-            }
-
-            if (previousClusterState == newClusterState) {
-                if (updateTask instanceof AckedClusterStateUpdateTask) {
+        if (previousClusterState == newClusterState) {
+            for (UpdateTask<T> task : proccessedListeners) {
+                if (task.listener instanceof AckedClusterStateTaskListener) {
                     //no need to wait for ack if nothing changed, the update can be counted as acknowledged
-                    ((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null);
+                    ((AckedClusterStateTaskListener) task.listener).onAllNodesAcked(null);
                 }
-                updateTask.clusterStateProcessed(source, previousClusterState, newClusterState);
-                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
-                logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
-                warnAboutSlowTaskIfNeeded(executionTime, source);
-                return;
+                task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
             }
+            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
+            logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
+            warnAboutSlowTaskIfNeeded(executionTime, source);
+            return;
+        }
 
-            try {
-                Discovery.AckListener ackListener = new NoOpAckListener();
-                if (newClusterState.nodes().localNodeMaster()) {
-                    // only the master controls the version numbers
-                    Builder builder = ClusterState.builder(newClusterState).incrementVersion();
-                    if (previousClusterState.routingTable() != newClusterState.routingTable()) {
-                        builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build());
-                    }
-                    if (previousClusterState.metaData() != newClusterState.metaData()) {
-                        builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
-                    }
-                    newClusterState = builder.build();
-
-                    if (updateTask instanceof AckedClusterStateUpdateTask) {
-                        final AckedClusterStateUpdateTask ackedUpdateTask = (AckedClusterStateUpdateTask) updateTask;
-                        if (ackedUpdateTask.ackTimeout() == null || ackedUpdateTask.ackTimeout().millis() == 0) {
-                            ackedUpdateTask.onAckTimeout();
+        try {
+            ArrayList<Discovery.AckListener> ackListeners = new ArrayList<>();
+            if (newClusterState.nodes().localNodeMaster()) {
+                // only the master controls the version numbers
+                Builder builder = ClusterState.builder(newClusterState).incrementVersion();
+                if (previousClusterState.routingTable() != newClusterState.routingTable()) {
+                    builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build());
+                }
+                if (previousClusterState.metaData() != newClusterState.metaData()) {
+                    builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
+                }
+                newClusterState = builder.build();
+                for (UpdateTask<T> task : proccessedListeners) {
+                    if (task.listener instanceof AckedClusterStateTaskListener) {
+                        final AckedClusterStateTaskListener ackedListener = (AckedClusterStateTaskListener) task.listener;
+                        if (ackedListener.ackTimeout() == null || ackedListener.ackTimeout().millis() == 0) {
+                            ackedListener.onAckTimeout();
                         } else {
                             try {
-                                ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool);
+                                ackListeners.add(new AckCountDownListener(ackedListener, newClusterState.version(), newClusterState.nodes(), threadPool));
                             } catch (EsRejectedExecutionException ex) {
                                 if (logger.isDebugEnabled()) {
                                     logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
                                 }
                                 //timeout straightaway, otherwise we could wait forever as the timeout thread has not started
-                                ackedUpdateTask.onAckTimeout();
+                                ackedListener.onAckTimeout();
                             }
                         }
                     }
                 }
+            }
+            final Discovery.AckListener ackListener = new DelegetingAckListener(ackListeners);
 
-                newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);
+            newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);
 
-                if (logger.isTraceEnabled()) {
-                    StringBuilder sb = new StringBuilder("cluster state updated, source [").append(source).append("]\n");
-                    sb.append(newClusterState.prettyPrint());
-                    logger.trace(sb.toString());
-                } else if (logger.isDebugEnabled()) {
-                    logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
-                }
+            if (logger.isTraceEnabled()) {
+                StringBuilder sb = new StringBuilder("cluster state updated, source [").append(source).append("]\n");
+                sb.append(newClusterState.prettyPrint());
+                logger.trace(sb.toString());
+            } else if (logger.isDebugEnabled()) {
+                logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
+            }
 
-                ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
-                // new cluster state, notify all listeners
-                final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
-                if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
-                    String summary = nodesDelta.shortSummary();
-                    if (summary.length() > 0) {
-                        logger.info("{}, reason: {}", summary, source);
-                    }
+            ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
+            // new cluster state, notify all listeners
+            final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
+            if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
+                String summary = nodesDelta.shortSummary();
+                if (summary.length() > 0) {
+                    logger.info("{}, reason: {}", summary, source);
                 }
+            }
 
-                // TODO, do this in parallel (and wait)
-                for (DiscoveryNode node : nodesDelta.addedNodes()) {
-                    if (!nodeRequiresConnection(node)) {
-                        continue;
-                    }
-                    try {
-                        transportService.connectToNode(node);
-                    } catch (Throwable e) {
-                        // the fault detection will detect it as failed as well
-                        logger.warn("failed to connect to node [" + node + "]", e);
-                    }
+            // TODO, do this in parallel (and wait)
+            for (DiscoveryNode node : nodesDelta.addedNodes()) {
+                if (!nodeRequiresConnection(node)) {
+                    continue;
                 }
+                try {
+                    transportService.connectToNode(node);
+                } catch (Throwable e) {
+                    // the fault detection will detect it as failed as well
+                    logger.warn("failed to connect to node [" + node + "]", e);
+                }
+            }
 
-                // if we are the master, publish the new state to all nodes
-                // we publish here before we send a notification to all the listeners, since if it fails
-                // we don't want to notify
-                if (newClusterState.nodes().localNodeMaster()) {
-                    logger.debug("publishing cluster state version [{}]", newClusterState.version());
-                    try {
-                        discoveryService.publish(clusterChangedEvent, ackListener);
-                    } catch (Discovery.FailedToCommitClusterStateException t) {
-                        logger.warn("failing [{}]: failed to commit cluster state version [{}]", t, source, newClusterState.version());
-                        updateTask.onFailure(source, t);
-                        return;
-                    }
+            // if we are the master, publish the new state to all nodes
+            // we publish here before we send a notification to all the listeners, since if it fails
+            // we don't want to notify
+            if (newClusterState.nodes().localNodeMaster()) {
+                logger.debug("publishing cluster state version [{}]", newClusterState.version());
+                try {
+                    discoveryService.publish(clusterChangedEvent, ackListener);
+                } catch (Discovery.FailedToCommitClusterStateException t) {
+                    logger.warn("failing [{}]: failed to commit cluster state version [{}]", t, source, newClusterState.version());
+                    proccessedListeners.forEach(task -> task.listener.onFailure(task.source, t));
+                    return;
                 }
+            }
 
-                // update the current cluster state
-                clusterState = newClusterState;
-                logger.debug("set local cluster state to version {}", newClusterState.version());
-                for (ClusterStateListener listener : preAppliedListeners) {
-                    try {
-                        listener.clusterChanged(clusterChangedEvent);
-                    } catch (Exception ex) {
-                        logger.warn("failed to notify ClusterStateListener", ex);
-                    }
+            // update the current cluster state
+            clusterState = newClusterState;
+            logger.debug("set local cluster state to version {}", newClusterState.version());
+            for (ClusterStateListener listener : preAppliedListeners) {
+                try {
+                    listener.clusterChanged(clusterChangedEvent);
+                } catch (Exception ex) {
+                    logger.warn("failed to notify ClusterStateListener", ex);
                 }
+            }
 
-                for (DiscoveryNode node : nodesDelta.removedNodes()) {
-                    try {
-                        transportService.disconnectFromNode(node);
-                    } catch (Throwable e) {
-                        logger.warn("failed to disconnect to node [" + node + "]", e);
-                    }
+            for (DiscoveryNode node : nodesDelta.removedNodes()) {
+                try {
+                    transportService.disconnectFromNode(node);
+                } catch (Throwable e) {
+                    logger.warn("failed to disconnect to node [" + node + "]", e);
                 }
+            }
 
-                newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
+            newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
 
-                for (ClusterStateListener listener : postAppliedListeners) {
-                    try {
-                        listener.clusterChanged(clusterChangedEvent);
-                    } catch (Exception ex) {
-                        logger.warn("failed to notify ClusterStateListener", ex);
-                    }
+            for (ClusterStateListener listener : postAppliedListeners) {
+                try {
+                    listener.clusterChanged(clusterChangedEvent);
+                } catch (Exception ex) {
+                    logger.warn("failed to notify ClusterStateListener", ex);
                 }
+            }
 
-                //manual ack only from the master at the end of the publish
-                if (newClusterState.nodes().localNodeMaster()) {
-                    try {
-                        ackListener.onNodeAck(newClusterState.nodes().localNode(), null);
-                    } catch (Throwable t) {
-                        logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode());
-                    }
+            //manual ack only from the master at the end of the publish
+            if (newClusterState.nodes().localNodeMaster()) {
+                try {
+                    ackListener.onNodeAck(newClusterState.nodes().localNode(), null);
+                } catch (Throwable t) {
+                    logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode());
                 }
+            }
 
-                updateTask.clusterStateProcessed(source, previousClusterState, newClusterState);
-
-                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
-                logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID());
-                warnAboutSlowTaskIfNeeded(executionTime, source);
-            } catch (Throwable t) {
-                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
-                StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.stateUUID()).append("], source [").append(source).append("]\n");
-                sb.append(newClusterState.nodes().prettyPrint());
-                sb.append(newClusterState.routingTable().prettyPrint());
-                sb.append(newClusterState.getRoutingNodes().prettyPrint());
-                logger.warn(sb.toString(), t);
-                // TODO: do we want to call updateTask.onFailure here?
+            for (UpdateTask<T> task : proccessedListeners) {
+                task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
             }
+
+            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
+            logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID());
+            warnAboutSlowTaskIfNeeded(executionTime, source);
+        } catch (Throwable t) {
+            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
+            StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.stateUUID()).append("], source [").append(source).append("]\n");
+            sb.append(newClusterState.nodes().prettyPrint());
+            sb.append(newClusterState.routingTable().prettyPrint());
+            sb.append(newClusterState.getRoutingNodes().prettyPrint());
+            logger.warn(sb.toString(), t);
+            // TODO: do we want to call updateTask.onFailure here?
+        }
+
+    }
+
+    class UpdateTask<T> extends SourcePrioritizedRunnable {
+
+        public final T task;
+        public final ClusterStateTaskConfig config;
+        public final ClusterStateTaskExecutor<T> executor;
+        public final ClusterStateTaskListener listener;
+        public final AtomicBoolean processed = new AtomicBoolean();
+
+        UpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor<T> executor, ClusterStateTaskListener listener) {
+            super(config.priority(), source);
+            this.task = task;
+            this.config = config;
+            this.executor = executor;
+            this.listener = listener;
+        }
+
+        @Override
+        public void run() {
+            runTasksForExecutor(executor);
         }
     }
 
@@ -729,13 +762,24 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
         }
     }
 
-    private static class NoOpAckListener implements Discovery.AckListener {
+    private static class DelegetingAckListener implements Discovery.AckListener {
+
+        final private List<Discovery.AckListener> listeners;
+
+        private DelegetingAckListener(List<Discovery.AckListener> listeners) {
+            this.listeners = listeners;
+        }
+
         @Override
         public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) {
+            for (Discovery.AckListener listener : listeners) {
+                listener.onNodeAck(node, t);
+            }
         }
 
         @Override
         public void onTimeout() {
+            throw new UnsupportedOperationException("no timeout delegation");
         }
     }
 
@@ -743,20 +787,20 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
 
         private static final ESLogger logger = Loggers.getLogger(AckCountDownListener.class);
 
-        private final AckedClusterStateUpdateTask ackedUpdateTask;
+        private final AckedClusterStateTaskListener ackedTaskListener;
         private final CountDown countDown;
         private final DiscoveryNodes nodes;
         private final long clusterStateVersion;
         private final Future<?> ackTimeoutCallback;
         private Throwable lastFailure;
 
-        AckCountDownListener(AckedClusterStateUpdateTask ackedUpdateTask, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) {
-            this.ackedUpdateTask = ackedUpdateTask;
+        AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) {
+            this.ackedTaskListener = ackedTaskListener;
             this.clusterStateVersion = clusterStateVersion;
             this.nodes = nodes;
             int countDown = 0;
             for (DiscoveryNode node : nodes) {
-                if (ackedUpdateTask.mustAck(node)) {
+                if (ackedTaskListener.mustAck(node)) {
                     countDown++;
                 }
             }
@@ -764,7 +808,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
             countDown = Math.max(1, countDown);
             logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion);
             this.countDown = new CountDown(countDown);
-            this.ackTimeoutCallback = threadPool.schedule(ackedUpdateTask.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() {
+            this.ackTimeoutCallback = threadPool.schedule(ackedTaskListener.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() {
                 @Override
                 public void run() {
                     onTimeout();
@@ -774,7 +818,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
 
         @Override
         public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) {
-            if (!ackedUpdateTask.mustAck(node)) {
+            if (!ackedTaskListener.mustAck(node)) {
                 //we always wait for the master ack anyway
                 if (!node.equals(nodes.masterNode())) {
                     return;
@@ -790,7 +834,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
             if (countDown.countDown()) {
                 logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion);
                 FutureUtils.cancel(ackTimeoutCallback);
-                ackedUpdateTask.onAllNodesAcked(lastFailure);
+                ackedTaskListener.onAllNodesAcked(lastFailure);
             }
         }
 
@@ -798,7 +842,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
         public void onTimeout() {
             if (countDown.fastForward()) {
                 logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterStateVersion);
-                ackedUpdateTask.onAckTimeout();
+                ackedTaskListener.onAckTimeout();
             }
         }
     }
@@ -810,5 +854,4 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
             InternalClusterService.this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
         }
     }
-
 }

+ 7 - 3
core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

@@ -133,7 +133,7 @@ public class NodeJoinController extends AbstractComponent {
 
     /** utility method to fail the given election context under the cluster state thread */
     private void failContext(final ElectionContext context, final String reason, final Throwable throwable) {
-        clusterService.submitStateUpdateTask("zen-disco-join(failure [" + reason + "])", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+        clusterService.submitStateUpdateTask("zen-disco-join(failure [" + reason + "])", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
 
             @Override
             public boolean runOnlyOnMaster() {
@@ -231,7 +231,7 @@ public class NodeJoinController extends AbstractComponent {
         }
 
         final String source = "zen-disco-join(elected_as_master, [" + pendingMasterJoins + "] joins received)";
-        clusterService.submitStateUpdateTask(source, Priority.IMMEDIATE, new ProcessJoinsTask() {
+        clusterService.submitStateUpdateTask(source, new ProcessJoinsTask(Priority.IMMEDIATE) {
             @Override
             public ClusterState execute(ClusterState currentState) {
                 // Take into account the previous known nodes, if they happen not to be available
@@ -280,7 +280,7 @@ public class NodeJoinController extends AbstractComponent {
 
     /** process all pending joins */
     private void processJoins(String reason) {
-        clusterService.submitStateUpdateTask("zen-disco-join(" + reason + ")", Priority.URGENT, new ProcessJoinsTask());
+        clusterService.submitStateUpdateTask("zen-disco-join(" + reason + ")", new ProcessJoinsTask(Priority.URGENT));
     }
 
 
@@ -356,6 +356,10 @@ public class NodeJoinController extends AbstractComponent {
         private final List<MembershipAction.JoinCallback> joinCallbacksToRespondTo = new ArrayList<>();
         private boolean nodeAdded = false;
 
+        public ProcessJoinsTask(Priority priority) {
+            super(priority);
+        }
+
         @Override
         public ClusterState execute(ClusterState currentState) {
             DiscoveryNodes.Builder nodesBuilder;

+ 8 - 8
core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -320,7 +320,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
         } catch (FailedToCommitClusterStateException t) {
             // cluster service logs a WARN message
             logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes());
-            clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+            clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
                     return rejoin(currentState, "failed to publish to min_master_nodes");
@@ -498,7 +498,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
             return;
         }
         if (localNodeMaster()) {
-            clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+            clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
                     DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()).remove(node.id());
@@ -538,7 +538,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
             // nothing to do here...
             return;
         }
-        clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+        clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ClusterStateUpdateTask(Priority.IMMEDIATE) {
             @Override
             public ClusterState execute(ClusterState currentState) {
                 if (currentState.nodes().get(node.id()) == null) {
@@ -587,7 +587,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
             // We only set the new value. If the master doesn't see enough nodes it will revoke it's mastership.
             return;
         }
-        clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+        clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
             @Override
             public ClusterState execute(ClusterState currentState) {
                 // check if we have enough master nodes, if not, we need to move into joining the cluster again
@@ -627,7 +627,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
 
         logger.info("master_left [{}], reason [{}]", cause, masterNode, reason);
 
-        clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+        clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
 
             @Override
             public boolean runOnlyOnMaster() {
@@ -694,7 +694,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
     }
 
     void processNextPendingClusterState(String reason) {
-        clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", Priority.URGENT, new ClusterStateUpdateTask() {
+        clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", new ClusterStateUpdateTask(Priority.URGENT) {
             @Override
             public boolean runOnlyOnMaster() {
                 return false;
@@ -1059,7 +1059,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
                 return;
             }
             logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
-            clusterService.submitStateUpdateTask("ping from another master", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+            clusterService.submitStateUpdateTask("ping from another master", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
 
                 @Override
                 public ClusterState execute(ClusterState currentState) throws Exception {
@@ -1114,7 +1114,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
     class RejoinClusterRequestHandler implements TransportRequestHandler<RejoinClusterRequest> {
         @Override
         public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
-            clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+            clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
 
                 @Override
                 public boolean runOnlyOnMaster() {

+ 134 - 23
core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java

@@ -43,23 +43,17 @@ import org.elasticsearch.test.MockLogAppender;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.threadpool.ThreadPool;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.elasticsearch.common.settings.Settings.settingsBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.*;
 
 /**
  *
@@ -711,32 +705,146 @@ public class ClusterServiceIT extends ESIntegTestCase {
                 .build();
         internalCluster().startNode(settings);
         ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
-        BlockingTask block = new BlockingTask();
-        clusterService.submitStateUpdateTask("test", Priority.IMMEDIATE, block);
+        BlockingTask block = new BlockingTask(Priority.IMMEDIATE);
+        clusterService.submitStateUpdateTask("test", block);
         int taskCount = randomIntBetween(5, 20);
         Priority[] priorities = Priority.values();
 
         // will hold all the tasks in the order in which they were executed
-        List<PrioritiezedTask> tasks = new ArrayList<>(taskCount);
+        List<PrioritizedTask> tasks = new ArrayList<>(taskCount);
         CountDownLatch latch = new CountDownLatch(taskCount);
         for (int i = 0; i < taskCount; i++) {
             Priority priority = priorities[randomIntBetween(0, priorities.length - 1)];
-            clusterService.submitStateUpdateTask("test", priority, new PrioritiezedTask(priority, latch, tasks));
+            clusterService.submitStateUpdateTask("test", new PrioritizedTask(priority, latch, tasks));
         }
 
         block.release();
         latch.await();
 
         Priority prevPriority = null;
-        for (PrioritiezedTask task : tasks) {
+        for (PrioritizedTask task : tasks) {
             if (prevPriority == null) {
-                prevPriority = task.priority;
+                prevPriority = task.priority();
             } else {
-                assertThat(task.priority.sameOrAfter(prevPriority), is(true));
+                assertThat(task.priority().sameOrAfter(prevPriority), is(true));
             }
         }
     }
 
+    public void testClusterStateBatchedUpdates() throws InterruptedException {
+        Settings settings = settingsBuilder()
+                .put("discovery.type", "local")
+                .build();
+        internalCluster().startNode(settings);
+        ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
+
+        AtomicInteger counter = new AtomicInteger();
+        class Task {
+            private AtomicBoolean state = new AtomicBoolean();
+
+            public void execute() {
+                if (!state.compareAndSet(false, true)) {
+                    throw new IllegalStateException();
+                } else {
+                    counter.incrementAndGet();
+                }
+            }
+        }
+
+        class TaskExecutor implements ClusterStateTaskExecutor<Task> {
+            private AtomicInteger counter = new AtomicInteger();
+
+            @Override
+            public BatchResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
+                tasks.forEach(task -> task.execute());
+                counter.addAndGet(tasks.size());
+                return BatchResult.<Task>builder().successes(tasks).build(currentState);
+            }
+
+            @Override
+            public boolean runOnlyOnMaster() {
+                return false;
+            }
+        }
+        int numberOfThreads = randomIntBetween(2, 256);
+        int tasksSubmittedPerThread = randomIntBetween(1, 1024);
+
+        ConcurrentMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
+        CountDownLatch latch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread);
+        ClusterStateTaskListener listener = new ClusterStateTaskListener() {
+            @Override
+            public void onFailure(String source, Throwable t) {
+                assert false;
+            }
+
+            @Override
+            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                counters.computeIfAbsent(source, key -> new AtomicInteger()).incrementAndGet();
+                latch.countDown();
+            }
+        };
+
+        int numberOfExecutors = Math.max(1, numberOfThreads / 4);
+        List<TaskExecutor> executors = new ArrayList<>();
+        for (int i = 0; i < numberOfExecutors; i++) {
+            executors.add(new TaskExecutor());
+        }
+
+        // randomly assign tasks to executors
+        List<TaskExecutor> assignments = new ArrayList<>();
+        for (int i = 0; i < numberOfThreads; i++) {
+            for (int j = 0; j < tasksSubmittedPerThread; j++) {
+                assignments.add(randomFrom(executors));
+            }
+        }
+
+        Map<TaskExecutor, Integer> counts = new HashMap<>();
+        for (TaskExecutor executor : assignments) {
+            counts.merge(executor, 1, (previous, one) -> previous + one);
+        }
+
+        CountDownLatch startingGun = new CountDownLatch(1 + numberOfThreads);
+        List<Thread> threads = new ArrayList<>();
+        for (int i = 0; i < numberOfThreads; i++) {
+            final int index = i;
+            Thread thread = new Thread(() -> {
+                startingGun.countDown();
+                for (int j = 0; j < tasksSubmittedPerThread; j++) {
+                    ClusterStateTaskExecutor<Task> executor = assignments.get(index * tasksSubmittedPerThread + j);
+                    clusterService.submitStateUpdateTask(
+                            Thread.currentThread().getName(),
+                            new Task(),
+                            ClusterStateTaskConfig.build(Priority.NORMAL),
+                            executor,
+                            listener);
+                }
+            });
+            threads.add(thread);
+            thread.start();
+        }
+
+        startingGun.countDown();
+        for (Thread thread : threads) {
+            thread.join();
+        }
+
+        // wait until all the cluster state updates have been processed
+        latch.await();
+
+        // assert the number of executed tasks is correct
+        assertEquals(numberOfThreads * tasksSubmittedPerThread, counter.get());
+
+        // assert each executor executed the correct number of tasks
+        for (TaskExecutor executor : executors) {
+            assertEquals((int)counts.get(executor), executor.counter.get());
+        }
+
+        // assert the correct number of clusterStateProcessed events were triggered
+        for (Map.Entry<String, AtomicInteger> entry : counters.entrySet()) {
+            assertEquals(entry.getValue().get(), tasksSubmittedPerThread);
+        }
+    }
+
     @TestLogging("cluster:TRACE") // To ensure that we log cluster state events on TRACE level
     public void testClusterStateUpdateLogging() throws Exception {
         Settings settings = settingsBuilder()
@@ -947,6 +1055,10 @@ public class ClusterServiceIT extends ESIntegTestCase {
     private static class BlockingTask extends ClusterStateUpdateTask {
         private final CountDownLatch latch = new CountDownLatch(1);
 
+        public BlockingTask(Priority priority) {
+            super(priority);
+        }
+
         @Override
         public ClusterState execute(ClusterState currentState) throws Exception {
             latch.await();
@@ -963,14 +1075,13 @@ public class ClusterServiceIT extends ESIntegTestCase {
 
     }
 
-    private static class PrioritiezedTask extends ClusterStateUpdateTask {
+    private static class PrioritizedTask extends ClusterStateUpdateTask {
 
-        private final Priority priority;
         private final CountDownLatch latch;
-        private final List<PrioritiezedTask> tasks;
+        private final List<PrioritizedTask> tasks;
 
-        private PrioritiezedTask(Priority priority, CountDownLatch latch, List<PrioritiezedTask> tasks) {
-            this.priority = priority;
+        private PrioritizedTask(Priority priority, CountDownLatch latch, List<PrioritizedTask> tasks) {
+            super(priority);
             this.latch = latch;
             this.tasks = tasks;
         }

+ 6 - 35
core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java

@@ -25,11 +25,7 @@ import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.ClusterChangedEvent;
-import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateListener;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.*;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -59,16 +55,7 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
 import org.elasticsearch.test.ESIntegTestCase.Scope;
 import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
-import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
-import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
-import org.elasticsearch.test.disruption.LongGCDisruption;
-import org.elasticsearch.test.disruption.NetworkDelaysPartition;
-import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
-import org.elasticsearch.test.disruption.NetworkPartition;
-import org.elasticsearch.test.disruption.NetworkUnresponsivePartition;
-import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
-import org.elasticsearch.test.disruption.SingleNodeDisruption;
-import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
+import org.elasticsearch.test.disruption.*;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.transport.TransportException;
@@ -78,31 +65,15 @@ import org.elasticsearch.transport.TransportService;
 import org.junit.Before;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.*;
 
 @ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
 @ESIntegTestCase.SuppressLocalMode
@@ -650,7 +621,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         // but will be queued and once the old master node un-freezes it gets executed.
         // The old master node will send this update + the cluster state where he is flagged as master to the other
         // nodes that follow the new master. These nodes should ignore this update.
-        internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+        internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
             @Override
             public ClusterState execute(ClusterState currentState) throws Exception {
                 return ClusterState.builder(currentState).build();

+ 3 - 13
core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java

@@ -27,13 +27,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.routing.IndexRoutingTable;
-import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
-import org.elasticsearch.cluster.routing.RoutingNode;
-import org.elasticsearch.cluster.routing.RoutingTable;
-import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.cluster.routing.ShardRoutingState;
-import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.cluster.routing.*;
 import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
@@ -56,11 +50,7 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
 import org.elasticsearch.test.disruption.SingleNodeDisruption;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.transport.MockTransportService;
-import org.elasticsearch.transport.ConnectTransportException;
-import org.elasticsearch.transport.TransportException;
-import org.elasticsearch.transport.TransportRequest;
-import org.elasticsearch.transport.TransportRequestOptions;
-import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.transport.*;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -407,7 +397,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
         // disable relocations when we do this, to make sure the shards are not relocated from node2
         // due to rebalancing, and delete its content
         client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)).get();
-        internalCluster().getInstance(ClusterService.class, nonMasterNode).submitStateUpdateTask("test", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+        internalCluster().getInstance(ClusterService.class, nonMasterNode).submitStateUpdateTask("test", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
             @Override
             public ClusterState execute(ClusterState currentState) throws Exception {
                 IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder("test");

+ 2 - 7
core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -20,12 +20,7 @@ package org.elasticsearch.snapshots;
 
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
-import org.elasticsearch.cluster.ClusterChangedEvent;
-import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateListener;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
-import org.elasticsearch.cluster.SnapshotsInProgress;
+import org.elasticsearch.cluster.*;
 import org.elasticsearch.cluster.metadata.SnapshotId;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.cluster.service.PendingClusterTask;
@@ -208,7 +203,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
 
         private void addBlock() {
             // We should block after this task - add blocking cluster state update task
-            clusterService.submitStateUpdateTask("test_block", passThroughPriority, new ClusterStateUpdateTask() {
+            clusterService.submitStateUpdateTask("test_block", new ClusterStateUpdateTask(passThroughPriority) {
                 @Override
                 public ClusterState execute(ClusterState currentState) throws Exception {
                     while(System.currentTimeMillis() < stopWaitingAt) {

+ 2 - 3
test-framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java

@@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.OperationRouting;
 import org.elasticsearch.cluster.service.PendingClusterTask;
-import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.component.LifecycleListener;
 import org.elasticsearch.common.transport.DummyTransportAddress;
@@ -115,12 +114,12 @@ public class NoopClusterService implements ClusterService {
     }
 
     @Override
-    public void submitStateUpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
+    public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {
 
     }
 
     @Override
-    public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {
+    public <T> void submitStateUpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor<T> executor, ClusterStateTaskListener listener) {
 
     }
 

+ 20 - 20
test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java

@@ -28,7 +28,6 @@ import org.elasticsearch.cluster.routing.OperationRouting;
 import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
 import org.elasticsearch.cluster.service.PendingClusterTask;
 import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.component.LifecycleListener;
 import org.elasticsearch.common.logging.ESLogger;
@@ -40,10 +39,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.threadpool.ThreadPool;
 
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledFuture;
 
@@ -183,31 +179,35 @@ public class TestClusterService implements ClusterService {
     }
 
     @Override
-    synchronized public void submitStateUpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
+    public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {
+        submitStateUpdateTask(source, null, updateTask, updateTask, updateTask);
+    }
+
+    @Override
+    synchronized public <T> void submitStateUpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor<T> executor, ClusterStateTaskListener listener) {
         logger.debug("processing [{}]", source);
-        if (state().nodes().localNodeMaster() == false && updateTask.runOnlyOnMaster()) {
-            updateTask.onNoLongerMaster(source);
+        if (state().nodes().localNodeMaster() == false && executor.runOnlyOnMaster()) {
+            listener.onNoLongerMaster(source);
             logger.debug("failed [{}], no longer master", source);
             return;
         }
-        ClusterState newState;
+        ClusterStateTaskExecutor.BatchResult<T> batchResult;
         ClusterState previousClusterState = state;
         try {
-            newState = updateTask.execute(previousClusterState);
+            batchResult = executor.execute(previousClusterState, Arrays.asList(task));
         } catch (Exception e) {
-            updateTask.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]", e));
-            return;
-        }
-        setStateAndNotifyListeners(newState);
-        if (updateTask instanceof ClusterStateUpdateTask) {
-            ((ClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newState);
+            batchResult = ClusterStateTaskExecutor.BatchResult.<T>builder().failure(task, e).build(previousClusterState);
         }
+
+        batchResult.executionResults.get(task).handle(
+                () -> {},
+                ex -> listener.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]", ex))
+        );
+
+        setStateAndNotifyListeners(batchResult.resultingState);
+        listener.clusterStateProcessed(source, previousClusterState, batchResult.resultingState);
         logger.debug("finished [{}]", source);
-    }
 
-    @Override
-    public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {
-        submitStateUpdateTask(source, Priority.NORMAL, updateTask);
     }
 
     @Override

+ 1 - 1
test-framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java

@@ -58,7 +58,7 @@ public class BlockClusterStateProcessing extends SingleNodeDisruption {
         boolean success = disruptionLatch.compareAndSet(null, new CountDownLatch(1));
         assert success : "startDisrupting called without waiting on stopDistrupting to complete";
         final CountDownLatch started = new CountDownLatch(1);
-        clusterService.submitStateUpdateTask("service_disruption_block", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+        clusterService.submitStateUpdateTask("service_disruption_block", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
 
             @Override
             public boolean runOnlyOnMaster() {

+ 1 - 1
test-framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java

@@ -102,7 +102,7 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
             return false;
         }
         final AtomicBoolean stopped = new AtomicBoolean(false);
-        clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+        clusterService.submitStateUpdateTask("service_disruption_delay", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
 
             @Override
             public boolean runOnlyOnMaster() {