Browse Source

Improve slow logging in MasterService (#45086)

Adds a tighter threshold for logging a warning about slowness in the
`MasterService` instead of relying on the cluster service's 30-second warning
threshold. This new threshold applies to the computation of the cluster state
update in isolation, so we get a warning if computing a new cluster state
update takes longer than 10 seconds even if it is subsequently applied quickly.
It also applies independently to the length of time it takes to notify the
cluster state tasks on completion of publication, in case any of these
notifications holds up the master thread for too long.

Relates #45007
David Turner 6 years ago
parent
commit
6143ebfc63

+ 10 - 3
server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

@@ -39,6 +39,7 @@ import org.elasticsearch.common.StopWatch;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -67,12 +68,15 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.elasticsearch.cluster.service.ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING;
 import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
 
 public class ClusterApplierService extends AbstractLifecycleComponent implements ClusterApplier {
     private static final Logger logger = LogManager.getLogger(ClusterApplierService.class);
 
+    public static final Setting<TimeValue> CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING =
+        Setting.positiveTimeSetting("cluster.service.slow_task_logging_threshold", TimeValue.timeValueSeconds(30),
+            Setting.Property.Dynamic, Setting.Property.NodeScope);
+
     public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask";
 
     private final ClusterSettings clusterSettings;
@@ -109,12 +113,15 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
         this.clusterSettings = clusterSettings;
         this.threadPool = threadPool;
         this.state = new AtomicReference<>();
-        this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
         this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
         this.nodeName = nodeName;
+
+        this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
+        this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
+            this::setSlowTaskLoggingThreshold);
     }
 
-    public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
+    private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
         this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
     }
 

+ 1 - 13
server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

@@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -47,10 +46,6 @@ public class ClusterService extends AbstractLifecycleComponent {
 
     private final ClusterApplierService clusterApplierService;
 
-    public static final Setting<TimeValue> CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING =
-            Setting.positiveTimeSetting("cluster.service.slow_task_logging_threshold", TimeValue.timeValueSeconds(30),
-                    Property.Dynamic, Property.NodeScope);
-
     public static final org.elasticsearch.common.settings.Setting.AffixSetting<String> USER_DEFINED_META_DATA =
         Setting.prefixKeySetting("cluster.metadata.", (key) -> Setting.simpleString(key, Property.Dynamic, Property.NodeScope));
 
@@ -68,7 +63,7 @@ public class ClusterService extends AbstractLifecycleComponent {
     private final String nodeName;
 
     public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
-        this(settings, clusterSettings, new MasterService(Node.NODE_NAME_SETTING.get(settings), settings, threadPool),
+        this(settings, clusterSettings, new MasterService(settings, clusterSettings, threadPool),
             new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool));
     }
 
@@ -80,18 +75,11 @@ public class ClusterService extends AbstractLifecycleComponent {
         this.operationRouting = new OperationRouting(settings, clusterSettings);
         this.clusterSettings = clusterSettings;
         this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
-        this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
-            this::setSlowTaskLoggingThreshold);
         // Add a no-op update consumer so changes are logged
         this.clusterSettings.addAffixUpdateConsumer(USER_DEFINED_META_DATA, (first, second) -> {}, (first, second) -> {});
         this.clusterApplierService = clusterApplierService;
     }
 
-    private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
-        masterService.setSlowTaskLoggingThreshold(slowTaskLoggingThreshold);
-        clusterApplierService.setSlowTaskLoggingThreshold(slowTaskLoggingThreshold);
-    }
-
     public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
         clusterApplierService.setNodeConnectionsService(nodeConnectionsService);
     }

+ 88 - 87
server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

@@ -32,6 +32,8 @@ import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
+import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
+import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -40,6 +42,8 @@ import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.text.Text;
 import org.elasticsearch.common.unit.TimeValue;
@@ -49,9 +53,8 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
 import org.elasticsearch.discovery.Discovery;
-import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
+import org.elasticsearch.node.Node;
 import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -65,15 +68,18 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import static org.elasticsearch.cluster.service.ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING;
 import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
 
 public class MasterService extends AbstractLifecycleComponent {
     private static final Logger logger = LogManager.getLogger(MasterService.class);
 
-    public static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";
+    public static final Setting<TimeValue> MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING =
+        Setting.positiveTimeSetting("cluster.service.slow_master_task_logging_threshold", TimeValue.timeValueSeconds(10),
+            Setting.Property.Dynamic, Setting.Property.NodeScope);
+
+    static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";
 
-    protected ClusterStatePublisher clusterStatePublisher;
+    ClusterStatePublisher clusterStatePublisher;
 
     private final String nodeName;
 
@@ -86,14 +92,16 @@ public class MasterService extends AbstractLifecycleComponent {
     private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
     private volatile Batcher taskBatcher;
 
-    public MasterService(String nodeName, Settings settings, ThreadPool threadPool) {
-        this.nodeName = nodeName;
-        // TODO: introduce a dedicated setting for master service
-        this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
+    public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
+        this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
+
+        this.slowTaskLoggingThreshold = MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
+        clusterSettings.addSettingsUpdateConsumer(MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, this::setSlowTaskLoggingThreshold);
+
         this.threadPool = threadPool;
     }
 
-    public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
+    private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
         this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
     }
 
@@ -121,6 +129,7 @@ public class MasterService extends AbstractLifecycleComponent {
                 threadPool.scheduler());
     }
 
+    @SuppressWarnings("unchecked")
     class Batcher extends TaskBatcher {
 
         Batcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {
@@ -179,25 +188,20 @@ public class MasterService extends AbstractLifecycleComponent {
         return Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME);
     }
 
-    public static boolean assertMasterUpdateThread() {
-        assert isMasterUpdateThread() : "not called from the master service thread";
-        return true;
-    }
-
     public static boolean assertNotMasterUpdateThread(String reason) {
         assert isMasterUpdateThread() == false :
             "Expected current thread [" + Thread.currentThread() + "] to not be the master service thread. Reason: [" + reason + "]";
         return true;
     }
 
-    protected void runTasks(TaskInputs taskInputs) {
+    private void runTasks(TaskInputs taskInputs) {
         final String summary = taskInputs.summary;
         if (!lifecycle.started()) {
             logger.debug("processing [{}]: ignoring, master service not started", summary);
             return;
         }
 
-        logger.debug("processing [{}]: execute", summary);
+        logger.debug("executing cluster state update for [{}]", summary);
         final ClusterState previousClusterState = state();
 
         if (!previousClusterState.nodes().isLocalNodeElectedMaster() && taskInputs.runOnlyWhenMaster()) {
@@ -206,22 +210,25 @@ public class MasterService extends AbstractLifecycleComponent {
             return;
         }
 
-        long startTimeNS = currentTimeInNanos();
-        TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS);
+        final long computationStartTime = threadPool.relativeTimeInMillis();
+        final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
         taskOutputs.notifyFailedTasks();
+        final TimeValue computationTime = getTimeSince(computationStartTime);
+        logExecutionTime(computationTime, "compute cluster state update", summary);
 
         if (taskOutputs.clusterStateUnchanged()) {
+            final long notificationStartTime = threadPool.relativeTimeInMillis();
             taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
-            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
-            logger.debug("processing [{}]: took [{}] no change in cluster state", summary, executionTime);
-            warnAboutSlowTaskIfNeeded(executionTime, summary);
+            final TimeValue executionTime = getTimeSince(notificationStartTime);
+            logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
         } else {
-            ClusterState newClusterState = taskOutputs.newClusterState;
+            final ClusterState newClusterState = taskOutputs.newClusterState;
             if (logger.isTraceEnabled()) {
                 logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
-            } else if (logger.isDebugEnabled()) {
+            } else {
                 logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
             }
+            final long publicationStartTime = threadPool.relativeTimeInMillis();
             try {
                 ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
                 // new cluster state, notify all listeners
@@ -235,14 +242,18 @@ public class MasterService extends AbstractLifecycleComponent {
                 }
 
                 logger.debug("publishing cluster state version [{}]", newClusterState.version());
-                publish(clusterChangedEvent, taskOutputs, startTimeNS);
+                publish(clusterChangedEvent, taskOutputs, publicationStartTime);
             } catch (Exception e) {
-                handleException(summary, startTimeNS, newClusterState, e);
+                handleException(summary, publicationStartTime, newClusterState, e);
             }
         }
     }
 
-    protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) {
+    private TimeValue getTimeSince(long startTimeMillis) {
+        return TimeValue.timeValueMillis(Math.max(0, threadPool.relativeTimeInMillis() - startTimeMillis));
+    }
+
+    protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {
         final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() {
             @Override
             protected boolean blockingAllowed() {
@@ -254,13 +265,14 @@ public class MasterService extends AbstractLifecycleComponent {
         // indefinitely wait for publication to complete
         try {
             FutureUtils.get(fut);
-            onPublicationSuccess(clusterChangedEvent, taskOutputs, startTimeNS);
+            onPublicationSuccess(clusterChangedEvent, taskOutputs);
         } catch (Exception e) {
-            onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNS, e);
+            onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e);
         }
     }
 
-    protected void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) {
+    void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs) {
+        final long notificationStartTime = threadPool.relativeTimeInMillis();
         taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state());
 
         try {
@@ -270,33 +282,30 @@ public class MasterService extends AbstractLifecycleComponent {
                 "exception thrown while notifying executor of new cluster state publication [{}]",
                 clusterChangedEvent.source()), e);
         }
-        TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
-        logger.debug("processing [{}]: took [{}] done publishing updated cluster state (version: {}, uuid: {})",
-            clusterChangedEvent.source(),
-            executionTime, clusterChangedEvent.state().version(),
-            clusterChangedEvent.state().stateUUID());
-        warnAboutSlowTaskIfNeeded(executionTime, clusterChangedEvent.source());
+        final TimeValue executionTime = getTimeSince(notificationStartTime);
+        logExecutionTime(executionTime, "notify listeners on successful publication of cluster state (version: "
+            + clusterChangedEvent.state().version() + ", uuid: " + clusterChangedEvent.state().stateUUID() + ')',
+            clusterChangedEvent.source());
     }
 
-    protected void onPublicationFailed(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS,
-                                       Exception exception) {
+    void onPublicationFailed(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis, Exception exception) {
         if (exception instanceof FailedToCommitClusterStateException) {
             final long version = clusterChangedEvent.state().version();
             logger.warn(() -> new ParameterizedMessage(
                 "failing [{}]: failed to commit cluster state version [{}]", clusterChangedEvent.source(), version), exception);
             taskOutputs.publishingFailed((FailedToCommitClusterStateException) exception);
         } else {
-            handleException(clusterChangedEvent.source(), startTimeNS, clusterChangedEvent.state(), exception);
+            handleException(clusterChangedEvent.source(), startTimeMillis, clusterChangedEvent.state(), exception);
         }
     }
 
-    private void handleException(String summary, long startTimeNS, ClusterState newClusterState, Exception e) {
-        TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
+    private void handleException(String summary, long startTimeMillis, ClusterState newClusterState, Exception e) {
+        final TimeValue executionTime = getTimeSince(startTimeMillis);
         final long version = newClusterState.version();
         final String stateUUID = newClusterState.stateUUID();
         final String fullState = newClusterState.toString();
-        logger.warn(() -> new ParameterizedMessage(
-                "failed to publish updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
+        logger.warn(new ParameterizedMessage(
+                "took [{}] and then failed to publish updated cluster state (version: {}, uuid: {}) for [{}]:\n{}",
                 executionTime,
                 version,
                 stateUUID,
@@ -306,8 +315,8 @@ public class MasterService extends AbstractLifecycleComponent {
         // TODO: do we want to call updateTask.onFailure here?
     }
 
-    public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, long startTimeNS) {
-        ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState);
+    private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {
+        ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState);
         ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
         return new TaskOutputs(taskInputs, previousClusterState, newClusterState, getNonFailedTasks(taskInputs, clusterTasksResult),
             clusterTasksResult.executionResults);
@@ -381,12 +390,12 @@ public class MasterService extends AbstractLifecycleComponent {
     /**
      * Output created by executing a set of tasks provided as TaskInputs
      */
-    protected class TaskOutputs {
-        public final TaskInputs taskInputs;
-        public final ClusterState previousClusterState;
-        public final ClusterState newClusterState;
-        public final List<Batcher.UpdateTask> nonFailedTasks;
-        public final Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults;
+    class TaskOutputs {
+        final TaskInputs taskInputs;
+        final ClusterState previousClusterState;
+        final ClusterState newClusterState;
+        final List<Batcher.UpdateTask> nonFailedTasks;
+        final Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults;
 
         TaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState,
                            ClusterState newClusterState,
@@ -399,19 +408,19 @@ public class MasterService extends AbstractLifecycleComponent {
             this.executionResults = executionResults;
         }
 
-        public void publishingFailed(FailedToCommitClusterStateException t) {
+        void publishingFailed(FailedToCommitClusterStateException t) {
             nonFailedTasks.forEach(task -> task.listener.onFailure(task.source(), t));
         }
 
-        public void processedDifferentClusterState(ClusterState previousClusterState, ClusterState newClusterState) {
+        void processedDifferentClusterState(ClusterState previousClusterState, ClusterState newClusterState) {
             nonFailedTasks.forEach(task -> task.listener.clusterStateProcessed(task.source(), previousClusterState, newClusterState));
         }
 
-        public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
+        void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
             taskInputs.executor.clusterStatePublished(clusterChangedEvent);
         }
 
-        public Discovery.AckListener createAckListener(ThreadPool threadPool, ClusterState newClusterState) {
+        Discovery.AckListener createAckListener(ThreadPool threadPool, ClusterState newClusterState) {
             return new DelegatingAckListener(nonFailedTasks.stream()
                 .filter(task -> task.listener instanceof AckedClusterStateTaskListener)
                 .map(task -> new AckCountDownListener((AckedClusterStateTaskListener) task.listener, newClusterState.version(),
@@ -419,11 +428,11 @@ public class MasterService extends AbstractLifecycleComponent {
                 .collect(Collectors.toList()));
         }
 
-        public boolean clusterStateUnchanged() {
+        boolean clusterStateUnchanged() {
             return previousClusterState == newClusterState;
         }
 
-        public void notifyFailedTasks() {
+        void notifyFailedTasks() {
             // fail all tasks that have failed
             for (Batcher.UpdateTask updateTask : taskInputs.updateTasks) {
                 assert executionResults.containsKey(updateTask.task) : "missing " + updateTask;
@@ -434,7 +443,7 @@ public class MasterService extends AbstractLifecycleComponent {
             }
         }
 
-        public void notifySuccessfulTasksOnUnchangedClusterState() {
+        void notifySuccessfulTasksOnUnchangedClusterState() {
             nonFailedTasks.forEach(task -> {
                 if (task.listener instanceof AckedClusterStateTaskListener) {
                     //no need to wait for ack if nothing changed, the update can be counted as acknowledged
@@ -567,10 +576,11 @@ public class MasterService extends AbstractLifecycleComponent {
         }
     }
 
-    protected void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) {
+    private void logExecutionTime(TimeValue executionTime, String activity, String summary) {
         if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
-            logger.warn("cluster state update task [{}] took [{}] which is above the warn threshold of [{}]", source, executionTime,
-                slowTaskLoggingThreshold);
+            logger.warn("took [{}], which is over [{}], to {} for [{}]", executionTime, slowTaskLoggingThreshold, activity, summary);
+        } else {
+            logger.debug("took [{}] to {} for [{}]", executionTime, activity, summary);
         }
     }
 
@@ -680,7 +690,7 @@ public class MasterService extends AbstractLifecycleComponent {
         }
     }
 
-    protected ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, long startTimeNS, ClusterState previousClusterState) {
+    private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) {
         ClusterTasksResult<Object> clusterTasksResult;
         try {
             List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
@@ -691,20 +701,15 @@ public class MasterService extends AbstractLifecycleComponent {
                 throw new AssertionError("update task submitted to MasterService cannot remove master");
             }
         } catch (Exception e) {
-            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
-            if (logger.isTraceEnabled()) {
-                logger.trace(
-                    () -> new ParameterizedMessage(
-                        "failed to execute cluster state update in [{}], state:\nversion [{}], source [{}]\n{}{}{}",
-                        executionTime,
-                        previousClusterState.version(),
-                        taskInputs.summary,
-                        previousClusterState.nodes(),
-                        previousClusterState.routingTable(),
-                        previousClusterState.getRoutingNodes()),
-                    e);
-            }
-            warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary);
+            logger.trace(() -> new ParameterizedMessage(
+                    "failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}",
+                    previousClusterState.version(),
+                    previousClusterState.stateUUID(),
+                    taskInputs.summary,
+                    previousClusterState.nodes(),
+                    previousClusterState.routingTable(),
+                    previousClusterState.getRoutingNodes()), // may be expensive => construct message lazily
+                e);
             clusterTasksResult = ClusterTasksResult.builder()
                 .failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e)
                 .build(previousClusterState);
@@ -725,7 +730,7 @@ public class MasterService extends AbstractLifecycleComponent {
         return clusterTasksResult;
     }
 
-    public List<Batcher.UpdateTask> getNonFailedTasks(TaskInputs taskInputs,
+    private List<Batcher.UpdateTask> getNonFailedTasks(TaskInputs taskInputs,
                                                       ClusterTasksResult<Object> clusterTasksResult) {
         return taskInputs.updateTasks.stream().filter(updateTask -> {
             assert clusterTasksResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask;
@@ -738,10 +743,10 @@ public class MasterService extends AbstractLifecycleComponent {
     /**
      * Represents a set of tasks to be processed together with their executor
      */
-    protected class TaskInputs {
-        public final String summary;
-        public final List<Batcher.UpdateTask> updateTasks;
-        public final ClusterStateTaskExecutor<Object> executor;
+    private class TaskInputs {
+        final String summary;
+        final List<Batcher.UpdateTask> updateTasks;
+        final ClusterStateTaskExecutor<Object> executor;
 
         TaskInputs(ClusterStateTaskExecutor<Object> executor, List<Batcher.UpdateTask> updateTasks, String summary) {
             this.summary = summary;
@@ -749,11 +754,11 @@ public class MasterService extends AbstractLifecycleComponent {
             this.updateTasks = updateTasks;
         }
 
-        public boolean runOnlyWhenMaster() {
+        boolean runOnlyWhenMaster() {
             return executor.runOnlyOnMaster();
         }
 
-        public void onNoLongerMaster() {
+        void onNoLongerMaster() {
             updateTasks.forEach(task -> task.listener.onNoLongerMaster(task.source()));
         }
     }
@@ -795,8 +800,4 @@ public class MasterService extends AbstractLifecycleComponent {
         }
     }
 
-    // this one is overridden in tests so we can control time
-    protected long currentTimeInNanos() {
-        return System.nanoTime();
-    }
 }

+ 4 - 1
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -54,7 +54,9 @@ import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDeci
 import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
+import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.network.NetworkService;
@@ -271,8 +273,9 @@ public final class ClusterSettings extends AbstractScopedSettings {
             HierarchyCircuitBreakerService.ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING,
             HierarchyCircuitBreakerService.ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING,
             IndexModule.NODE_STORE_ALLOW_MMAP,
-            ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
+            ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
             ClusterService.USER_DEFINED_META_DATA,
+            MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
             SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
             SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
             TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,

+ 3 - 2
server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
 import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.cluster.service.MasterServiceTests;
 import org.elasticsearch.common.Randomness;
@@ -36,7 +37,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.BaseFuture;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
-import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.test.ClusterServiceUtils;
 import org.elasticsearch.test.ESTestCase;
@@ -132,7 +132,8 @@ public class NodeJoinTests extends ESTestCase {
     }
 
     private void setupRealMasterServiceAndCoordinator(long term, ClusterState initialState) {
-        MasterService masterService = new MasterService("test_node", Settings.EMPTY, threadPool);
+        MasterService masterService = new MasterService(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(),
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
         AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialState);
         masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
             clusterStateRef.set(event.state());

+ 394 - 287
server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

@@ -22,6 +22,7 @@ package org.elasticsearch.cluster.service;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -41,22 +42,22 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.BaseFuture;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.node.Node;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.MockLogAppender;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -84,11 +85,16 @@ import static org.hamcrest.Matchers.hasKey;
 public class MasterServiceTests extends ESTestCase {
 
     private static ThreadPool threadPool;
-    private TimedMasterService masterService;
+    private static long relativeTimeInMillis;
 
     @BeforeClass
     public static void createThreadPool() {
-        threadPool = new TestThreadPool(MasterServiceTests.class.getName());
+        threadPool = new TestThreadPool(MasterServiceTests.class.getName()) {
+            @Override
+            public long relativeTimeInMillis() {
+                return relativeTimeInMillis;
+            }
+        };
     }
 
     @AfterClass
@@ -100,46 +106,41 @@ public class MasterServiceTests extends ESTestCase {
     }
 
     @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        masterService = createTimedMasterService(true);
+    public void randomizeCurrentTime() {
+        relativeTimeInMillis = randomLongBetween(0L, 1L << 62);
     }
 
-    @After
-    public void tearDown() throws Exception {
-        masterService.close();
-        super.tearDown();
-    }
-
-    private TimedMasterService createTimedMasterService(boolean makeMaster) throws InterruptedException {
-        DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
+    private MasterService createMasterService(boolean makeMaster) {
+        final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
             emptySet(), Version.CURRENT);
-        TimedMasterService timedMasterService = new TimedMasterService(Settings.builder().put("cluster.name",
-            MasterServiceTests.class.getSimpleName()).build(), threadPool);
-        ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
+        final MasterService masterService = new MasterService(Settings.builder()
+            .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName())
+            .put(Node.NODE_NAME_SETTING.getKey(), "test_node")
+            .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
+        final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
             .nodes(DiscoveryNodes.builder()
                 .add(localNode)
                 .localNodeId(localNode.getId())
                 .masterNodeId(makeMaster ? localNode.getId() : null))
             .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
-        AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
-        timedMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
+        final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
+        masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
             clusterStateRef.set(event.state());
             publishListener.onResponse(null);
         });
-        timedMasterService.setClusterStateSupplier(clusterStateRef::get);
-        timedMasterService.start();
-        return timedMasterService;
+        masterService.setClusterStateSupplier(clusterStateRef::get);
+        masterService.start();
+        return masterService;
     }
 
     public void testMasterAwareExecution() throws Exception {
-        TimedMasterService nonMaster = createTimedMasterService(false);
+        final MasterService nonMaster = createMasterService(false);
 
         final boolean[] taskFailed = {false};
         final CountDownLatch latch1 = new CountDownLatch(1);
         nonMaster.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
             @Override
-            public ClusterState execute(ClusterState currentState) throws Exception {
+            public ClusterState execute(ClusterState currentState) {
                 latch1.countDown();
                 return currentState;
             }
@@ -154,11 +155,10 @@ public class MasterServiceTests extends ESTestCase {
         latch1.await();
         assertTrue("cluster state update task was executed on a non-master", taskFailed[0]);
 
-        taskFailed[0] = true;
         final CountDownLatch latch2 = new CountDownLatch(1);
         nonMaster.submitStateUpdateTask("test", new LocalClusterUpdateTask() {
             @Override
-            public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
+            public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
                 taskFailed[0] = false;
                 latch2.countDown();
                 return unchanged();
@@ -177,13 +177,13 @@ public class MasterServiceTests extends ESTestCase {
     }
 
     public void testThreadContext() throws InterruptedException {
-        final TimedMasterService master = createTimedMasterService(true);
+        final MasterService master = createMasterService(true);
         final CountDownLatch latch = new CountDownLatch(1);
 
         try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) {
             final Map<String, String> expectedHeaders = Collections.singletonMap("test", "test");
             final Map<String, List<String>> expectedResponseHeaders = Collections.singletonMap("testResponse",
-                Arrays.asList("testResponse"));
+                Collections.singletonList("testResponse"));
             threadPool.getThreadContext().putHeader(expectedHeaders);
 
             final TimeValue ackTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000));
@@ -273,37 +273,39 @@ public class MasterServiceTests extends ESTestCase {
         final CountDownLatch latch = new CountDownLatch(1);
         AtomicBoolean published = new AtomicBoolean();
 
-        masterService.submitStateUpdateTask(
-            "testClusterStateTaskListenerThrowingExceptionIsOkay",
-            new Object(),
-            ClusterStateTaskConfig.build(Priority.NORMAL),
-            new ClusterStateTaskExecutor<Object>() {
-                @Override
-                public ClusterTasksResult<Object> execute(ClusterState currentState, List<Object> tasks) throws Exception {
-                    ClusterState newClusterState = ClusterState.builder(currentState).build();
-                    return ClusterTasksResult.builder().successes(tasks).build(newClusterState);
-                }
+        try (MasterService masterService = createMasterService(true)) {
+            masterService.submitStateUpdateTask(
+                "testClusterStateTaskListenerThrowingExceptionIsOkay",
+                new Object(),
+                ClusterStateTaskConfig.build(Priority.NORMAL),
+                new ClusterStateTaskExecutor<Object>() {
+                    @Override
+                    public ClusterTasksResult<Object> execute(ClusterState currentState, List<Object> tasks) {
+                        ClusterState newClusterState = ClusterState.builder(currentState).build();
+                        return ClusterTasksResult.builder().successes(tasks).build(newClusterState);
+                    }
 
-                @Override
-                public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
-                    published.set(true);
-                    latch.countDown();
-                }
-            },
-            new ClusterStateTaskListener() {
-                @Override
-                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                    throw new IllegalStateException(source);
-                }
+                    @Override
+                    public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
+                        published.set(true);
+                        latch.countDown();
+                    }
+                },
+                new ClusterStateTaskListener() {
+                    @Override
+                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                        throw new IllegalStateException(source);
+                    }
 
-                @Override
-                public void onFailure(String source, Exception e) {
+                    @Override
+                    public void onFailure(String source, Exception e) {
+                    }
                 }
-            }
-        );
+            );
 
-        latch.await();
-        assertTrue(published.get());
+            latch.await();
+            assertTrue(published.get());
+        }
     }
 
     @TestLogging(value = "org.elasticsearch.cluster.service:TRACE", reason = "to ensure that we log cluster state events on TRACE level")
@@ -312,37 +314,81 @@ public class MasterServiceTests extends ESTestCase {
         mockAppender.start();
         mockAppender.addExpectation(
             new MockLogAppender.SeenEventExpectation(
-                "test1",
+                "test1 start",
                 MasterService.class.getCanonicalName(),
                 Level.DEBUG,
-                "*processing [test1]: took [1s] no change in cluster state"));
+                "executing cluster state update for [test1]"));
         mockAppender.addExpectation(
             new MockLogAppender.SeenEventExpectation(
-                "test2",
+                "test1 computation",
+                MasterService.class.getCanonicalName(),
+                Level.DEBUG,
+                "took [1s] to compute cluster state update for [test1]"));
+        mockAppender.addExpectation(
+            new MockLogAppender.SeenEventExpectation(
+                "test1 notification",
+                MasterService.class.getCanonicalName(),
+                Level.DEBUG,
+                "took [0s] to notify listeners on unchanged cluster state for [test1]"));
+
+        mockAppender.addExpectation(
+            new MockLogAppender.SeenEventExpectation(
+                "test2 start",
+                MasterService.class.getCanonicalName(),
+                Level.DEBUG,
+                "executing cluster state update for [test2]"));
+        mockAppender.addExpectation(
+            new MockLogAppender.SeenEventExpectation(
+                "test2 failure",
                 MasterService.class.getCanonicalName(),
                 Level.TRACE,
-                "*failed to execute cluster state update in [2s]*"));
+                "failed to execute cluster state update (on version: [*], uuid: [*]) for [test2]*"));
         mockAppender.addExpectation(
             new MockLogAppender.SeenEventExpectation(
-                "test3",
+                "test2 computation",
                 MasterService.class.getCanonicalName(),
                 Level.DEBUG,
-                "*processing [test3]: took [3s] done publishing updated cluster state (version: *, uuid: *)"));
+                "took [2s] to compute cluster state update for [test2]"));
+        mockAppender.addExpectation(
+            new MockLogAppender.SeenEventExpectation(
+                "test2 notification",
+                MasterService.class.getCanonicalName(),
+                Level.DEBUG,
+                "took [0s] to notify listeners on unchanged cluster state for [test2]"));
+
+        mockAppender.addExpectation(
+            new MockLogAppender.SeenEventExpectation(
+                "test3 start",
+                MasterService.class.getCanonicalName(),
+                Level.DEBUG,
+                "executing cluster state update for [test3]"));
+        mockAppender.addExpectation(
+            new MockLogAppender.SeenEventExpectation(
+                "test3 computation",
+                MasterService.class.getCanonicalName(),
+                Level.DEBUG,
+                "took [3s] to compute cluster state update for [test3]"));
+        mockAppender.addExpectation(
+            new MockLogAppender.SeenEventExpectation(
+                "test3 notification",
+                MasterService.class.getCanonicalName(),
+                Level.DEBUG,
+                "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [test3]"));
+
         mockAppender.addExpectation(
             new MockLogAppender.SeenEventExpectation(
                 "test4",
                 MasterService.class.getCanonicalName(),
                 Level.DEBUG,
-                "*processing [test4]: took [0s] no change in cluster state"));
+                "executing cluster state update for [test4]"));
 
         Logger clusterLogger = LogManager.getLogger(MasterService.class);
         Loggers.addAppender(clusterLogger, mockAppender);
-        try {
-            masterService.currentTimeOverride = System.nanoTime();
+        try (MasterService masterService = createMasterService(true)) {
             masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
                 @Override
-                public ClusterState execute(ClusterState currentState) throws Exception {
-                    masterService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos();
+                public ClusterState execute(ClusterState currentState) {
+                    relativeTimeInMillis += TimeValue.timeValueSeconds(1).millis();
                     return currentState;
                 }
 
@@ -357,7 +403,7 @@ public class MasterServiceTests extends ESTestCase {
             masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
-                    masterService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos();
+                    relativeTimeInMillis += TimeValue.timeValueSeconds(2).millis();
                     throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
                 }
 
@@ -372,20 +418,20 @@ public class MasterServiceTests extends ESTestCase {
             masterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
-                    masterService.currentTimeOverride += TimeValue.timeValueSeconds(3).nanos();
+                    relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis();
                     return ClusterState.builder(currentState).incrementVersion().build();
                 }
 
                 @Override
-                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { }
+                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                    relativeTimeInMillis += TimeValue.timeValueSeconds(4).millis();
+                }
 
                 @Override
                 public void onFailure(String source, Exception e) {
                     fail();
                 }
             });
-            // Additional update task to make sure all previous logging made it to the loggerName
-            // We don't check logging for this on since there is no guarantee that it will occur before our check
             masterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
@@ -533,70 +579,72 @@ public class MasterServiceTests extends ESTestCase {
             }
         };
 
-        final ConcurrentMap<String, AtomicInteger> submittedTasksPerThread = new ConcurrentHashMap<>();
-        CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
-        for (int i = 0; i < numberOfThreads; i++) {
-            final int index = i;
-            Thread thread = new Thread(() -> {
-                final String threadName = Thread.currentThread().getName();
-                try {
-                    barrier.await();
-                    for (int j = 0; j < taskSubmissionsPerThread; j++) {
-                        Tuple<TaskExecutor, Set<Task>> assignment = assignments.get(index * taskSubmissionsPerThread + j);
-                        final Set<Task> tasks = assignment.v2();
-                        submittedTasksPerThread.computeIfAbsent(threadName, key -> new AtomicInteger()).addAndGet(tasks.size());
-                        final TaskExecutor executor = assignment.v1();
-                        if (tasks.size() == 1) {
-                            masterService.submitStateUpdateTask(
-                                threadName,
-                                tasks.stream().findFirst().get(),
-                                ClusterStateTaskConfig.build(randomFrom(Priority.values())),
-                                executor,
-                                listener);
-                        } else {
-                            Map<Task, ClusterStateTaskListener> taskListeners = new HashMap<>();
-                            tasks.stream().forEach(t -> taskListeners.put(t, listener));
-                            masterService.submitStateUpdateTasks(
-                                threadName,
-                                taskListeners, ClusterStateTaskConfig.build(randomFrom(Priority.values())),
-                                executor
-                            );
+        try (MasterService masterService = createMasterService(true)) {
+            final ConcurrentMap<String, AtomicInteger> submittedTasksPerThread = new ConcurrentHashMap<>();
+            CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
+            for (int i = 0; i < numberOfThreads; i++) {
+                final int index = i;
+                Thread thread = new Thread(() -> {
+                    final String threadName = Thread.currentThread().getName();
+                    try {
+                        barrier.await();
+                        for (int j = 0; j < taskSubmissionsPerThread; j++) {
+                            Tuple<TaskExecutor, Set<Task>> assignment = assignments.get(index * taskSubmissionsPerThread + j);
+                            final Set<Task> tasks = assignment.v2();
+                            submittedTasksPerThread.computeIfAbsent(threadName, key -> new AtomicInteger()).addAndGet(tasks.size());
+                            final TaskExecutor executor = assignment.v1();
+                            if (tasks.size() == 1) {
+                                masterService.submitStateUpdateTask(
+                                    threadName,
+                                    tasks.stream().findFirst().get(),
+                                    ClusterStateTaskConfig.build(randomFrom(Priority.values())),
+                                    executor,
+                                    listener);
+                            } else {
+                                Map<Task, ClusterStateTaskListener> taskListeners = new HashMap<>();
+                                tasks.forEach(t -> taskListeners.put(t, listener));
+                                masterService.submitStateUpdateTasks(
+                                    threadName,
+                                    taskListeners, ClusterStateTaskConfig.build(randomFrom(Priority.values())),
+                                    executor
+                                );
+                            }
                         }
+                        barrier.await();
+                    } catch (BrokenBarrierException | InterruptedException e) {
+                        throw new AssertionError(e);
                     }
-                    barrier.await();
-                } catch (BrokenBarrierException | InterruptedException e) {
-                    throw new AssertionError(e);
-                }
-            });
-            thread.start();
-        }
+                });
+                thread.start();
+            }
 
-        // wait for all threads to be ready
-        barrier.await();
-        // wait for all threads to finish
-        barrier.await();
+            // wait for all threads to be ready
+            barrier.await();
+            // wait for all threads to finish
+            barrier.await();
 
-        // wait until all the cluster state updates have been processed
-        updateLatch.await();
-        // and until all of the publication callbacks have completed
-        semaphore.acquire(numberOfExecutors);
+            // wait until all the cluster state updates have been processed
+            updateLatch.await();
+            // and until all of the publication callbacks have completed
+            semaphore.acquire(numberOfExecutors);
 
-        // assert the number of executed tasks is correct
-        assertEquals(totalTaskCount, counter.get());
+            // assert the number of executed tasks is correct
+            assertEquals(totalTaskCount, counter.get());
 
-        // assert each executor executed the correct number of tasks
-        for (TaskExecutor executor : executors) {
-            if (counts.containsKey(executor)) {
-                assertEquals((int) counts.get(executor), executor.counter.get());
-                assertEquals(executor.batches.get(), executor.published.get());
+            // assert each executor executed the correct number of tasks
+            for (TaskExecutor executor : executors) {
+                if (counts.containsKey(executor)) {
+                    assertEquals((int) counts.get(executor), executor.counter.get());
+                    assertEquals(executor.batches.get(), executor.published.get());
+                }
             }
-        }
 
-        // assert the correct number of clusterStateProcessed events were triggered
-        for (Map.Entry<String, AtomicInteger> entry : processedStates.entrySet()) {
-            assertThat(submittedTasksPerThread, hasKey(entry.getKey()));
-            assertEquals("not all tasks submitted by " + entry.getKey() + " received a processed event",
-                entry.getValue().get(), submittedTasksPerThread.get(entry.getKey()).get());
+            // assert the correct number of clusterStateProcessed events were triggered
+            for (Map.Entry<String, AtomicInteger> entry : processedStates.entrySet()) {
+                assertThat(submittedTasksPerThread, hasKey(entry.getKey()));
+                assertEquals("not all tasks submitted by " + entry.getKey() + " received a processed event",
+                    entry.getValue().get(), submittedTasksPerThread.get(entry.getKey()).get());
+            }
         }
     }
 
@@ -605,42 +653,44 @@ public class MasterServiceTests extends ESTestCase {
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<AssertionError> assertionRef = new AtomicReference<>();
 
-        masterService.submitStateUpdateTask(
-            "testBlockingCallInClusterStateTaskListenerFails",
-            new Object(),
-            ClusterStateTaskConfig.build(Priority.NORMAL),
-            (currentState, tasks) -> {
-                ClusterState newClusterState = ClusterState.builder(currentState).build();
-                return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build(newClusterState);
-            },
-            new ClusterStateTaskListener() {
-                @Override
-                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                    BaseFuture<Void> future = new BaseFuture<Void>() {};
-                    try {
-                        if (randomBoolean()) {
-                            future.get(1L, TimeUnit.SECONDS);
-                        } else {
-                            future.get();
+        try (MasterService masterService = createMasterService(true)) {
+            masterService.submitStateUpdateTask(
+                "testBlockingCallInClusterStateTaskListenerFails",
+                new Object(),
+                ClusterStateTaskConfig.build(Priority.NORMAL),
+                (currentState, tasks) -> {
+                    ClusterState newClusterState = ClusterState.builder(currentState).build();
+                    return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build(newClusterState);
+                },
+                new ClusterStateTaskListener() {
+                    @Override
+                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                        BaseFuture<Void> future = new BaseFuture<Void>() {
+                        };
+                        try {
+                            if (randomBoolean()) {
+                                future.get(1L, TimeUnit.SECONDS);
+                            } else {
+                                future.get();
+                            }
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        } catch (AssertionError e) {
+                            assertionRef.set(e);
+                            latch.countDown();
                         }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    } catch (AssertionError e) {
-                        assertionRef.set(e);
-                        latch.countDown();
                     }
-                }
 
-                @Override
-                public void onFailure(String source, Exception e) {
+                    @Override
+                    public void onFailure(String source, Exception e) {
+                    }
                 }
-            }
-        );
+            );
 
-        latch.await();
-        assertNotNull(assertionRef.get());
-        assertThat(assertionRef.get().getMessage(),
-            containsString("Reason: [Blocking operation]"));
+            latch.await();
+            assertNotNull(assertionRef.get());
+            assertThat(assertionRef.get().getMessage(), containsString("Reason: [Blocking operation]"));
+        }
     }
 
     @TestLogging(value = "org.elasticsearch.cluster.service:WARN", reason = "to ensure that we log cluster state events on WARN level")
@@ -649,39 +699,77 @@ public class MasterServiceTests extends ESTestCase {
         mockAppender.start();
         mockAppender.addExpectation(
             new MockLogAppender.UnseenEventExpectation(
-                "test1 shouldn't see because setting is too low",
+                "test1 shouldn't log because it was fast enough",
                 MasterService.class.getCanonicalName(),
                 Level.WARN,
-                "*cluster state update task [test1] took [*] which is above the warn threshold of [*]"));
+                "*took*test1*"));
         mockAppender.addExpectation(
             new MockLogAppender.SeenEventExpectation(
                 "test2",
                 MasterService.class.getCanonicalName(),
                 Level.WARN,
-                "*cluster state update task [test2] took [32s] which is above the warn threshold of [*]"));
+                "*took [*], which is over [10s], to compute cluster state update for [test2]"));
         mockAppender.addExpectation(
             new MockLogAppender.SeenEventExpectation(
                 "test3",
                 MasterService.class.getCanonicalName(),
                 Level.WARN,
-                "*cluster state update task [test3] took [33s] which is above the warn threshold of [*]"));
+                "*took [*], which is over [10s], to compute cluster state update for [test3]"));
         mockAppender.addExpectation(
             new MockLogAppender.SeenEventExpectation(
                 "test4",
                 MasterService.class.getCanonicalName(),
                 Level.WARN,
-                "*cluster state update task [test4] took [34s] which is above the warn threshold of [*]"));
+                "*took [*], which is over [10s], to compute cluster state update for [test4]"));
+        mockAppender.addExpectation(
+            new MockLogAppender.UnseenEventExpectation(
+                "test5 should not log despite publishing slowly",
+                MasterService.class.getCanonicalName(),
+                Level.WARN,
+                "*took*test5*"));
+        mockAppender.addExpectation(
+            new MockLogAppender.SeenEventExpectation(
+                "test6 should log due to slow and failing publication",
+                MasterService.class.getCanonicalName(),
+                Level.WARN,
+                "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [test6]:*"));
 
         Logger clusterLogger = LogManager.getLogger(MasterService.class);
         Loggers.addAppender(clusterLogger, mockAppender);
-        try {
-            final CountDownLatch latch = new CountDownLatch(5);
+        try (MasterService masterService = new MasterService(Settings.builder()
+            .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName())
+            .put(Node.NODE_NAME_SETTING.getKey(), "test_node")
+            .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool)) {
+
+            final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
+                emptySet(), Version.CURRENT);
+            final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
+                .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
+                .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
+            final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
+            masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
+                if (event.source().contains("test5")) {
+                    relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
+                        + randomLongBetween(1, 1000000);
+                }
+                if (event.source().contains("test6")) {
+                    relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
+                        + randomLongBetween(1, 1000000);
+                    throw new ElasticsearchException("simulated error during slow publication which should trigger logging");
+                }
+                clusterStateRef.set(event.state());
+                publishListener.onResponse(null);
+            });
+            masterService.setClusterStateSupplier(clusterStateRef::get);
+            masterService.start();
+
+            final CountDownLatch latch = new CountDownLatch(6);
             final CountDownLatch processedFirstTask = new CountDownLatch(1);
-            masterService.currentTimeOverride = System.nanoTime();
             masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
                 @Override
-                public ClusterState execute(ClusterState currentState) throws Exception {
-                    masterService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos();
+                public ClusterState execute(ClusterState currentState) {
+                    relativeTimeInMillis += randomLongBetween(0L,
+                        MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis());
                     return currentState;
                 }
 
@@ -700,8 +788,9 @@ public class MasterServiceTests extends ESTestCase {
             processedFirstTask.await();
             masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
                 @Override
-                public ClusterState execute(ClusterState currentState) throws Exception {
-                    masterService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos();
+                public ClusterState execute(ClusterState currentState) {
+                    relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
+                        + randomLongBetween(1, 1000000);
                     throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
                 }
 
@@ -717,8 +806,9 @@ public class MasterServiceTests extends ESTestCase {
             });
             masterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
                 @Override
-                public ClusterState execute(ClusterState currentState) throws Exception {
-                    masterService.currentTimeOverride += TimeValue.timeValueSeconds(33).nanos();
+                public ClusterState execute(ClusterState currentState) {
+                    relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
+                        + randomLongBetween(1, 1000000);
                     return ClusterState.builder(currentState).incrementVersion().build();
                 }
 
@@ -734,8 +824,9 @@ public class MasterServiceTests extends ESTestCase {
             });
             masterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
                 @Override
-                public ClusterState execute(ClusterState currentState) throws Exception {
-                    masterService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos();
+                public ClusterState execute(ClusterState currentState) {
+                    relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
+                        + randomLongBetween(1, 1000000);
                     return currentState;
                 }
 
@@ -749,12 +840,10 @@ public class MasterServiceTests extends ESTestCase {
                     fail();
                 }
             });
-            // Additional update task to make sure all previous logging made it to the loggerName
-            // We don't check logging for this on since there is no guarantee that it will occur before our check
             masterService.submitStateUpdateTask("test5", new ClusterStateUpdateTask() {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
-                    return currentState;
+                    return ClusterState.builder(currentState).incrementVersion().build();
                 }
 
                 @Override
@@ -767,55 +856,10 @@ public class MasterServiceTests extends ESTestCase {
                     fail();
                 }
             });
-            latch.await();
-        } finally {
-            Loggers.removeAppender(clusterLogger, mockAppender);
-            mockAppender.stop();
-        }
-        mockAppender.assertAllExpectationsMatched();
-    }
-
-    public void testAcking() throws InterruptedException {
-        final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
-        final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
-        final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
-        TimedMasterService timedMasterService = new TimedMasterService(Settings.builder().put("cluster.name",
-            MasterServiceTests.class.getSimpleName()).build(), threadPool);
-        ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
-            .nodes(DiscoveryNodes.builder()
-                .add(node1)
-                .add(node2)
-                .add(node3)
-                .localNodeId(node1.getId())
-                .masterNodeId(node1.getId()))
-            .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
-        final AtomicReference<ClusterStatePublisher> publisherRef = new AtomicReference<>();
-        timedMasterService.setClusterStatePublisher((e, pl, al) -> publisherRef.get().publish(e, pl, al));
-        timedMasterService.setClusterStateSupplier(() -> initialClusterState);
-        timedMasterService.start();
-
-        // check that we don't time out before even committing the cluster state
-        {
-            final CountDownLatch latch = new CountDownLatch(1);
-
-            publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> {
-                publishListener.onFailure(new FailedToCommitClusterStateException("mock exception"));
-            });
-
-            timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) {
+            masterService.submitStateUpdateTask("test6", new ClusterStateUpdateTask() {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
-                    return ClusterState.builder(currentState).build();
-                }
-
-                @Override
-                public TimeValue ackTimeout() {
-                    return TimeValue.ZERO;
-                }
-
-                @Override
-                public TimeValue timeout() {
-                    return null;
+                    return ClusterState.builder(currentState).incrementVersion().build();
                 }
 
                 @Override
@@ -824,97 +868,160 @@ public class MasterServiceTests extends ESTestCase {
                 }
 
                 @Override
-                protected Void newResponse(boolean acknowledged) {
-                    fail();
-                    return null;
+                public void onFailure(String source, Exception e) {
+                    fail(); // maybe we should notify here?
+                }
+            });
+            // Additional update task to make sure all previous logging made it to the loggerName
+            // We don't check logging for this on since there is no guarantee that it will occur before our check
+            masterService.submitStateUpdateTask("test7", new ClusterStateUpdateTask() {
+                @Override
+                public ClusterState execute(ClusterState currentState) {
+                    return currentState;
                 }
 
                 @Override
-                public void onFailure(String source, Exception e) {
+                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                     latch.countDown();
                 }
 
                 @Override
-                public void onAckTimeout() {
+                public void onFailure(String source, Exception e) {
                     fail();
                 }
             });
-
             latch.await();
+        } finally {
+            Loggers.removeAppender(clusterLogger, mockAppender);
+            mockAppender.stop();
         }
+        mockAppender.assertAllExpectationsMatched();
+    }
 
-        // check that we timeout if commit took too long
-        {
-            final CountDownLatch latch = new CountDownLatch(2);
+    public void testAcking() throws InterruptedException {
+        final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
+        final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
+        final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
+        try (MasterService masterService = new MasterService(Settings.builder()
+            .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName())
+            .put(Node.NODE_NAME_SETTING.getKey(), "test_node")
+            .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool)) {
+
+            final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
+                .nodes(DiscoveryNodes.builder()
+                    .add(node1)
+                    .add(node2)
+                    .add(node3)
+                    .localNodeId(node1.getId())
+                    .masterNodeId(node1.getId()))
+                .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
+            final AtomicReference<ClusterStatePublisher> publisherRef = new AtomicReference<>();
+            masterService.setClusterStatePublisher((e, pl, al) -> publisherRef.get().publish(e, pl, al));
+            masterService.setClusterStateSupplier(() -> initialClusterState);
+            masterService.start();
+
+            // check that we don't time out before even committing the cluster state
+            {
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                publisherRef.set((clusterChangedEvent, publishListener, ackListener) ->
+                    publishListener.onFailure(new FailedToCommitClusterStateException("mock exception")));
+
+                masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) {
+                    @Override
+                    public ClusterState execute(ClusterState currentState) {
+                        return ClusterState.builder(currentState).build();
+                    }
 
-            final TimeValue ackTimeout = TimeValue.timeValueMillis(randomInt(100));
+                    @Override
+                    public TimeValue ackTimeout() {
+                        return TimeValue.ZERO;
+                    }
 
-            publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> {
-                publishListener.onResponse(null);
-                ackListener.onCommit(TimeValue.timeValueMillis(ackTimeout.millis() + randomInt(100)));
-                ackListener.onNodeAck(node1, null);
-                ackListener.onNodeAck(node2, null);
-                ackListener.onNodeAck(node3, null);
-            });
+                    @Override
+                    public TimeValue timeout() {
+                        return null;
+                    }
 
-            timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) {
-                @Override
-                public ClusterState execute(ClusterState currentState) {
-                    return ClusterState.builder(currentState).build();
-                }
+                    @Override
+                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                        fail();
+                    }
 
-                @Override
-                public TimeValue ackTimeout() {
-                    return ackTimeout;
-                }
+                    @Override
+                    protected Void newResponse(boolean acknowledged) {
+                        fail();
+                        return null;
+                    }
 
-                @Override
-                public TimeValue timeout() {
-                    return null;
-                }
+                    @Override
+                    public void onFailure(String source, Exception e) {
+                        latch.countDown();
+                    }
 
-                @Override
-                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                    latch.countDown();
-                }
+                    @Override
+                    public void onAckTimeout() {
+                        fail();
+                    }
+                });
 
-                @Override
-                protected Void newResponse(boolean acknowledged) {
-                    fail();
-                    return null;
-                }
+                latch.await();
+            }
 
-                @Override
-                public void onFailure(String source, Exception e) {
-                    fail();
-                }
+            // check that we timeout if commit took too long
+            {
+                final CountDownLatch latch = new CountDownLatch(2);
 
-                @Override
-                public void onAckTimeout() {
-                    latch.countDown();
-                }
-            });
+                final TimeValue ackTimeout = TimeValue.timeValueMillis(randomInt(100));
 
-            latch.await();
-        }
+                publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> {
+                    publishListener.onResponse(null);
+                    ackListener.onCommit(TimeValue.timeValueMillis(ackTimeout.millis() + randomInt(100)));
+                    ackListener.onNodeAck(node1, null);
+                    ackListener.onNodeAck(node2, null);
+                    ackListener.onNodeAck(node3, null);
+                });
 
-        timedMasterService.close();
-    }
+                masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) {
+                    @Override
+                    public ClusterState execute(ClusterState currentState) {
+                        return ClusterState.builder(currentState).build();
+                    }
 
-    static class TimedMasterService extends MasterService {
+                    @Override
+                    public TimeValue ackTimeout() {
+                        return ackTimeout;
+                    }
 
-        public volatile Long currentTimeOverride = null;
+                    @Override
+                    public TimeValue timeout() {
+                        return null;
+                    }
 
-        TimedMasterService(Settings settings, ThreadPool threadPool) {
-            super("test_node", settings, threadPool);
-        }
+                    @Override
+                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                        latch.countDown();
+                    }
+
+                    @Override
+                    protected Void newResponse(boolean acknowledged) {
+                        fail();
+                        return null;
+                    }
+
+                    @Override
+                    public void onFailure(String source, Exception e) {
+                        fail();
+                    }
+
+                    @Override
+                    public void onAckTimeout() {
+                        latch.countDown();
+                    }
+                });
 
-        @Override
-        protected long currentTimeInNanos() {
-            if (currentTimeOverride != null) {
-                return currentTimeOverride;
+                latch.await();
             }
-            return super.currentTimeInNanos();
         }
     }
 

+ 2 - 2
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -22,8 +22,8 @@ package org.elasticsearch.snapshots;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.Version;
-import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.RequestValidators;
 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction;
 import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
@@ -107,6 +107,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
 import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
 import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -134,7 +135,6 @@ import org.elasticsearch.indices.IndicesModule;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.analysis.AnalysisModule;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
-import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
 import org.elasticsearch.indices.cluster.IndicesClusterStateService;
 import org.elasticsearch.indices.flush.SyncedFlushService;
 import org.elasticsearch.indices.mapper.MapperRegistry;

+ 0 - 0
server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java → test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java


+ 1 - 1
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -42,6 +42,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.UUIDs;
@@ -64,7 +65,6 @@ import org.elasticsearch.gateway.ClusterStateUpdaters;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.gateway.MetaStateService;
 import org.elasticsearch.gateway.MockGatewayMetaState;
-import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.disruption.DisruptableMockTransport;

+ 9 - 6
test/framework/src/main/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java → test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java

@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.elasticsearch.indices.cluster;
+package org.elasticsearch.cluster.service;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -24,13 +24,14 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
-import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.node.Node;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
@@ -54,7 +55,9 @@ public class FakeThreadPoolMasterService extends MasterService {
     private boolean waitForPublish = false;
 
     public FakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
-        super(nodeName, Settings.EMPTY, createMockThreadPool());
+        super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(),
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+            createMockThreadPool());
         this.name = serviceName;
         this.onTaskAvailableToRun = onTaskAvailableToRun;
     }
@@ -124,7 +127,7 @@ public class FakeThreadPoolMasterService extends MasterService {
     }
 
     @Override
-    protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) {
+    protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {
         assert waitForPublish == false;
         waitForPublish = true;
         final AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterChangedEvent.state());
@@ -139,7 +142,7 @@ public class FakeThreadPoolMasterService extends MasterService {
                 assert waitForPublish;
                 waitForPublish = false;
                 try {
-                    onPublicationSuccess(clusterChangedEvent, taskOutputs, startTimeNS);
+                    onPublicationSuccess(clusterChangedEvent, taskOutputs);
                 } finally {
                     taskInProgress = false;
                     scheduleNextTaskIfNecessary();
@@ -153,7 +156,7 @@ public class FakeThreadPoolMasterService extends MasterService {
                 assert waitForPublish;
                 waitForPublish = false;
                 try {
-                    onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNS, e);
+                    onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e);
                 } finally {
                     taskInProgress = false;
                     scheduleNextTaskIfNecessary();

+ 1 - 1
server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterServiceTests.java → test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java

@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.elasticsearch.indices.cluster;
+package org.elasticsearch.cluster.service;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;