Browse Source

Tidy up ClusterApplierService (#76837)

This commit cleans up some cruft left over from older versions of the
`ClusterApplierService`:

- `UpdateTask` doesn't need to implement lots of interfaces and give
  access to its internals, it can just pass appropriate arguments to
  `runTasks()`.
- No need for the `runOnApplierThread` override with a default priority,
  just have callers be explicit about the priority.
- `submitStateUpdateTask` takes a config which never has a timeout, may
  as well just pass the priority and remove the dead code
- `SafeClusterApplyListener` doesn't need to be a
  `ClusterApplyListener`, may as well just be an `ActionListener<Void>`.
- No implementations of `ClusterApplyListener` care about the source
  argument, may as well drop it.
- Adds assertions to prevent `ClusterApplyListener` implementations from
  throwing exceptions since we just swallow them.
- No need to override getting the current time in the
  `ClusterApplierService`, we can control this from the `ThreadPool`.
David Turner 4 years ago
parent
commit
ead0020497
16 changed files with 324 additions and 267 deletions
  1. 2 2
      server/src/internalClusterTest/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java
  2. 9 9
      server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java
  3. 5 5
      server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
  4. 15 6
      server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java
  5. 126 104
      server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java
  6. 9 4
      server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
  7. 2 2
      server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java
  8. 1 1
      server/src/test/java/org/elasticsearch/cluster/coordination/NoOpClusterApplier.java
  9. 1 1
      server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java
  10. 8 5
      server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java
  11. 124 114
      server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java
  12. 3 3
      server/src/test/java/org/elasticsearch/snapshots/InternalSnapshotsInfoServiceTests.java
  13. 3 1
      test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java
  14. 4 4
      test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java
  15. 6 3
      test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java
  16. 6 3
      test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java

+ 2 - 2
server/src/internalClusterTest/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java

@@ -432,12 +432,12 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
         CountDownLatch latch = new CountDownLatch(1);
         clusterApplierService.onNewClusterState("test", () -> newState, new ClusterApplyListener() {
             @Override
-            public void onSuccess(String source) {
+            public void onSuccess() {
                 latch.countDown();
             }
 
             @Override
-            public void onFailure(String source, Exception e) {
+            public void onFailure(Exception e) {
                 latch.countDown();
                 throw new AssertionError("Expected a proper response", e);
             }

+ 9 - 9
server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java

@@ -14,18 +14,18 @@ public interface ClusterStateTaskListener {
     /**
      * A callback for when task execution fails.
      *
-     * Implementations of this callback should not throw exceptions: an exception thrown here is logged by the master service at {@code
-     * ERROR} level and otherwise ignored. If log-and-ignore is the right behaviour then implementations should do so themselves, typically
-     * using a more specific logger and at a less dramatic log level.
+     * Implementations of this callback must not throw exceptions: an exception thrown here is logged by the master service at {@code ERROR}
+     * level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the right behaviour then
+     * implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
      */
     void onFailure(String source, Exception e);
 
     /**
      * A callback for when the task was rejected because the processing node is no longer the elected master.
      *
-     * Implementations of this callback should not throw exceptions: an exception thrown here is logged by the master service at {@code
-     * ERROR} level and otherwise ignored. If log-and-ignore is the right behaviour then implementations should do so themselves, typically
-     * using a more specific logger and at a less dramatic log level.
+     * Implementations of this callback must not throw exceptions: an exception thrown here is logged by the master service at {@code ERROR}
+     * level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the right behaviour then
+     * implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
      */
     default void onNoLongerMaster(String source) {
         onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));
@@ -35,9 +35,9 @@ public interface ClusterStateTaskListener {
      * Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} have been processed
      * properly by all listeners.
      *
-     * Implementations of this callback should not throw exceptions: an exception thrown here is logged by the master service at {@code
-     * ERROR} level and otherwise ignored. If log-and-ignore is the right behaviour then implementations should do so themselves, typically
-     * using a more specific logger and at a less dramatic log level.
+     * Implementations of this callback must not throw exceptions: an exception thrown here is logged by the master service at {@code ERROR}
+     * level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the right behaviour then
+     * implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
      */
     default void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
     }

+ 5 - 5
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -263,12 +263,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                     new ClusterApplyListener() {
 
                         @Override
-                        public void onFailure(String source, Exception e) {
+                        public void onFailure(Exception e) {
                             applyListener.onFailure(e);
                         }
 
                         @Override
-                        public void onSuccess(String source) {
+                        public void onSuccess() {
                             applyListener.onResponse(null);
                         }
                     });
@@ -532,7 +532,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
 
             if (applierState.nodes().getMasterNodeId() != null) {
                 applierState = clusterStateWithNoMasterBlock(applierState);
-                clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> {
+                clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, e -> {
                 });
             }
         }
@@ -1382,7 +1382,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                     clusterApplier.onNewClusterState(CoordinatorPublication.this.toString(), () -> applierState,
                         new ClusterApplyListener() {
                             @Override
-                            public void onFailure(String source, Exception e) {
+                            public void onFailure(Exception e) {
                                 synchronized (mutex) {
                                     removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
                                 }
@@ -1392,7 +1392,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                             }
 
                             @Override
-                            public void onSuccess(String source) {
+                            public void onSuccess() {
                                 clusterStatePublicationEvent.setMasterApplyElapsedMillis(
                                     transportService.getThreadPool().rawRelativeTimeInMillis() - completionTimeMillis);
                                 synchronized (mutex) {

+ 15 - 6
server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java

@@ -33,17 +33,26 @@ public interface ClusterApplier {
      */
     interface ClusterApplyListener {
         /**
-         * Called on successful cluster state application
-         * @param source information where the cluster state came from
+         * Called on successful cluster state application.
+         *
+         * Implementations of this callback must not throw exceptions: an exception thrown here is logged by the cluster applier service at
+         * {@code ERROR} level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the
+         * right behaviour then implementations must do so themselves, typically using a more specific logger and at a less dramatic log
+         * level.
          */
-        default void onSuccess(String source) {
+        default void onSuccess() {
         }
 
         /**
-         * Called on failure during cluster state application
-         * @param source information where the cluster state came from
+         * Called on failure during cluster state application.
+         *
+         * Implementations of this callback must not throw exceptions: an exception thrown here is logged by the cluster applier service at
+         * {@code ERROR} level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the
+         * right behaviour then implementations must do so themselves, typically using a more specific logger and at a less dramatic log
+         * level.
+         *
          * @param e exception that occurred
          */
-        void onFailure(String source, Exception e);
+        void onFailure(Exception e);
     }
 }

+ 126 - 104
server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

@@ -11,30 +11,30 @@ package org.elasticsearch.cluster.service;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateApplier;
 import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.ClusterStateObserver;
-import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.LocalNodeMasterListener;
 import org.elasticsearch.cluster.NodeConnectionsService;
 import org.elasticsearch.cluster.TimeoutClusterStateListener;
-import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.StopWatch;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.core.Releasable;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.indices.store.IndicesStore;
 import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -64,7 +64,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
     public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask";
 
     private final ClusterSettings clusterSettings;
-    protected final ThreadPool threadPool;
+    private final ThreadPool threadPool;
 
     private volatile TimeValue slowTaskLoggingThreshold;
 
@@ -131,25 +131,24 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
             PrioritizedEsThreadPoolExecutor.StarvationWatcher.NOOP_STARVATION_WATCHER);
     }
 
-    class UpdateTask extends SourcePrioritizedRunnable implements Function<ClusterState, ClusterState> {
-        final ClusterApplyListener listener;
-        final Function<ClusterState, ClusterState> updateFunction;
+    class UpdateTask extends SourcePrioritizedRunnable {
+        private final ActionListener<Void> listener;
+        private final Function<ClusterState, ClusterState> updateFunction;
 
-        UpdateTask(Priority priority, String source, ClusterApplyListener listener,
-                   Function<ClusterState, ClusterState> updateFunction) {
+        UpdateTask(
+            Priority priority,
+            String source,
+            ActionListener<Void> listener,
+            Function<ClusterState, ClusterState> updateFunction
+        ) {
             super(priority, source);
             this.listener = listener;
             this.updateFunction = updateFunction;
         }
 
-        @Override
-        public ClusterState apply(ClusterState clusterState) {
-            return updateFunction.apply(clusterState);
-        }
-
         @Override
         public void run() {
-            runTask(this);
+            runTask(source(), updateFunction, listener);
         }
     }
 
@@ -175,7 +174,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
      * Should be renamed to appliedClusterState
      */
     public ClusterState state() {
-        assert assertNotCalledFromClusterStateApplier("the applied cluster state is not yet available");
+        assert assertNotCalledFromClusterStateApplier();
         ClusterState clusterState = this.state.get();
         assert clusterState != null : "initial cluster state not set yet";
         return clusterState;
@@ -280,9 +279,22 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
         }
     }
 
-    public void runOnApplierThread(final String source, Consumer<ClusterState> clusterStateConsumer,
-                                   final ClusterApplyListener listener, Priority priority) {
-        submitStateUpdateTask(source, ClusterStateTaskConfig.build(priority),
+    /**
+     * Run the given clusterStateConsumer on the applier thread. Should only be used in tests and by {@link IndicesStore} when it's deleting
+     * the data behind a shard that moved away from a node.
+     *
+     * @param priority              {@link Priority#HIGH} unless in tests.
+     */
+    // TODO get rid of this, make it so that shard data can be deleted without blocking the applier thread.
+    public void runOnApplierThread(
+        String source,
+        Priority priority,
+        Consumer<ClusterState> clusterStateConsumer,
+        ClusterApplyListener listener
+    ) {
+        submitStateUpdateTask(
+            source,
+            priority,
             (clusterState) -> {
                 clusterStateConsumer.accept(clusterState);
                 return clusterState;
@@ -290,51 +302,52 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
             listener);
     }
 
-    public void runOnApplierThread(final String source, Consumer<ClusterState> clusterStateConsumer,
-                                   final ClusterApplyListener listener) {
-        runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH);
-    }
-
     public ThreadPool threadPool() {
         return threadPool;
     }
 
     @Override
-    public void onNewClusterState(final String source, final Supplier<ClusterState> clusterStateSupplier,
-                                  final ClusterApplyListener listener) {
-        Function<ClusterState, ClusterState> applyFunction = currentState -> {
-            ClusterState nextState = clusterStateSupplier.get();
-            if (nextState != null) {
-                return nextState;
-            } else {
-                return currentState;
-            }
-        };
-        submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), applyFunction, listener);
+    public void onNewClusterState(
+        final String source,
+        final Supplier<ClusterState> clusterStateSupplier,
+        final ClusterApplyListener listener
+    ) {
+        submitStateUpdateTask(
+            source,
+            Priority.HIGH,
+            currentState -> {
+                ClusterState nextState = clusterStateSupplier.get();
+                if (nextState != null) {
+                    return nextState;
+                } else {
+                    return currentState;
+                }
+            }, listener);
     }
 
-    private void submitStateUpdateTask(final String source, final ClusterStateTaskConfig config,
-                                       final Function<ClusterState, ClusterState> executor,
-                                       final ClusterApplyListener listener) {
+    private void submitStateUpdateTask(
+        final String source,
+        final Priority priority,
+        final Function<ClusterState, ClusterState> clusterStateUpdate,
+        final ClusterApplyListener listener
+    ) {
         if (lifecycle.started() == false) {
             return;
         }
+
         final ThreadContext threadContext = threadPool.getThreadContext();
-        final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
+        final Supplier<ThreadContext.StoredContext> storedContextSupplier = threadContext.newRestorableContext(true);
+
         try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
             threadContext.markAsSystemContext();
-            final UpdateTask updateTask = new UpdateTask(config.priority(), source,
-                new SafeClusterApplyListener(listener, supplier, logger), executor);
-            if (config.timeout() != null) {
-                threadPoolExecutor.execute(updateTask, config.timeout(),
-                    () -> threadPool.generic().execute(
-                        () -> listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source))));
-            } else {
-                threadPoolExecutor.execute(updateTask);
-            }
+            threadPoolExecutor.execute(new UpdateTask(
+                priority,
+                source,
+                new ClusterApplyActionListener(source, listener, storedContextSupplier),
+                clusterStateUpdate));
         } catch (EsRejectedExecutionException e) {
-            // ignore cases where we are shutting down..., there is really nothing interesting
-            // to be done here...
+            assert lifecycle.stoppedOrClosed() : e;
+            // ignore cases where we are shutting down..., there is really nothing interesting to be done here...
             if (lifecycle.stoppedOrClosed() == false) {
                 throw e;
             }
@@ -349,7 +362,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
     }
 
     /** asserts that the current stack trace does <b>NOT</b> involve a cluster state applier */
-    private static boolean assertNotCalledFromClusterStateApplier(String reason) {
+    private static boolean assertNotCalledFromClusterStateApplier() {
         if (Thread.currentThread().getName().contains(CLUSTER_UPDATE_THREAD_NAME)) {
             for (StackTraceElement element : Thread.currentThread().getStackTrace()) {
                 final String className = element.getClassName();
@@ -359,87 +372,95 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
                     return true;
                 } else if (className.equals(ClusterApplierService.class.getName())
                     && methodName.equals("callClusterStateAppliers")) {
-                    throw new AssertionError("should not be called by a cluster state applier. reason [" + reason + "]");
+                    throw new AssertionError("should not be called by a cluster state applier: the applied state is not yet available");
                 }
             }
         }
         return true;
     }
 
-    private void runTask(UpdateTask task) {
+    private void runTask(String source, Function<ClusterState, ClusterState> updateFunction, ActionListener<Void> clusterApplyListener) {
         if (lifecycle.started() == false) {
-            logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
+            logger.debug("processing [{}]: ignoring, cluster applier service not started", source);
             return;
         }
 
-        logger.debug("processing [{}]: execute", task.source);
+        logger.debug("processing [{}]: execute", source);
         final ClusterState previousClusterState = state.get();
 
-        long startTimeMS = currentTimeInMillis();
+        final long startTimeMillis = threadPool.relativeTimeInMillis();
         final StopWatch stopWatch = new StopWatch();
         final ClusterState newClusterState;
         try {
-            try (Releasable ignored = stopWatch.timing("running task [" + task.source + ']')) {
-                newClusterState = task.apply(previousClusterState);
+            try (Releasable ignored = stopWatch.timing("running task [" + source + ']')) {
+                newClusterState = updateFunction.apply(previousClusterState);
             }
         } catch (Exception e) {
-            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
+            TimeValue executionTime = getTimeSince(startTimeMillis);
             logger.trace(() -> new ParameterizedMessage(
                 "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
-                executionTime, previousClusterState.version(), task.source, previousClusterState), e);
-            warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
-            task.listener.onFailure(task.source, e);
+                executionTime, previousClusterState.version(), source, previousClusterState), e);
+            warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch);
+            clusterApplyListener.onFailure(e);
             return;
         }
 
         if (previousClusterState == newClusterState) {
-            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
-            logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
-            warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
-            task.listener.onSuccess(task.source);
+            TimeValue executionTime = getTimeSince(startTimeMillis);
+            logger.debug("processing [{}]: took [{}] no change in cluster state", source, executionTime);
+            warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch);
+            clusterApplyListener.onResponse(null);
         } else {
             if (logger.isTraceEnabled()) {
-                logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), task.source,
+                logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), source,
                     newClusterState);
             } else {
-                logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
+                logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
             }
             try {
-                applyChanges(task, previousClusterState, newClusterState, stopWatch);
-                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
-                logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
+                applyChanges(previousClusterState, newClusterState, source, stopWatch);
+                TimeValue executionTime = getTimeSince(startTimeMillis);
+                logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", source,
                     executionTime, newClusterState.version(),
                     newClusterState.stateUUID());
-                warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
-                task.listener.onSuccess(task.source);
+                warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch);
+                clusterApplyListener.onResponse(null);
             } catch (Exception e) {
-                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
+                TimeValue executionTime = getTimeSince(startTimeMillis);
                 if (logger.isTraceEnabled()) {
                     logger.warn(new ParameterizedMessage(
-                            "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
-                            executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source, newClusterState), e);
+                        "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
+                        executionTime, newClusterState.version(), newClusterState.stateUUID(), source, newClusterState), e);
                 } else {
                     logger.warn(new ParameterizedMessage(
-                            "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]",
-                            executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e);
+                        "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]",
+                        executionTime, newClusterState.version(), newClusterState.stateUUID(), source), e);
                 }
                 // failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we
                 // continue we will retry with the same cluster state but that might not help.
                 assert applicationMayFail();
-                task.listener.onFailure(task.source, e);
+                clusterApplyListener.onFailure(e);
             }
         }
     }
 
-    private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState, StopWatch stopWatch) {
-        ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(task.source, newClusterState, previousClusterState);
+    private TimeValue getTimeSince(long startTimeMillis) {
+        return TimeValue.timeValueMillis(Math.max(0, threadPool.relativeTimeInMillis() - startTimeMillis));
+    }
+
+    private void applyChanges(ClusterState previousClusterState, ClusterState newClusterState, String source, StopWatch stopWatch) {
+        ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
         // new cluster state, notify all listeners
         final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
         if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
             String summary = nodesDelta.shortSummary();
             if (summary.length() > 0) {
-                logger.info("{}, term: {}, version: {}, reason: {}",
-                    summary, newClusterState.term(), newClusterState.version(), task.source);
+                logger.info(
+                    "{}, term: {}, version: {}, reason: {}",
+                    summary,
+                    newClusterState.term(),
+                    newClusterState.version(),
+                    source);
             }
         }
 
@@ -515,33 +536,39 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
         }
     }
 
-    private static class SafeClusterApplyListener implements ClusterApplyListener {
+    private static class ClusterApplyActionListener implements ActionListener<Void> {
+        private final String source;
         private final ClusterApplyListener listener;
-        protected final Supplier<ThreadContext.StoredContext> context;
-        private final Logger logger;
-
-        SafeClusterApplyListener(ClusterApplyListener listener, Supplier<ThreadContext.StoredContext> context, Logger logger) {
+        private final Supplier<ThreadContext.StoredContext> storedContextSupplier;
+
+        ClusterApplyActionListener(
+            String source,
+            ClusterApplyListener listener,
+            Supplier<ThreadContext.StoredContext> storedContextSupplier
+        ) {
+            this.source = source;
             this.listener = listener;
-            this.context = context;
-            this.logger = logger;
+            this.storedContextSupplier = storedContextSupplier;
         }
 
         @Override
-        public void onFailure(String source, Exception e) {
-            try (ThreadContext.StoredContext ignore = context.get()) {
-                listener.onFailure(source, e);
+        public void onFailure(Exception e) {
+            try (ThreadContext.StoredContext ignored = storedContextSupplier.get()) {
+                listener.onFailure(e);
             } catch (Exception inner) {
                 inner.addSuppressed(e);
+                assert false : inner;
                 logger.error(new ParameterizedMessage(
                         "exception thrown by listener notifying of failure from [{}]", source), inner);
             }
         }
 
         @Override
-        public void onSuccess(String source) {
-            try (ThreadContext.StoredContext ignore = context.get()) {
-                listener.onSuccess(source);
+        public void onResponse(Void unused) {
+            try (ThreadContext.StoredContext ignored = storedContextSupplier.get()) {
+                listener.onSuccess();
             } catch (Exception e) {
+                assert false : e;
                 logger.error(new ParameterizedMessage(
                     "exception thrown by listener while notifying of cluster state processed from [{}]", source), e);
             }
@@ -588,12 +615,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
         }
     }
 
-    // this one is overridden in tests so we can control time
-    protected long currentTimeInMillis() {
-        return threadPool.relativeTimeInMillis();
-    }
-
-    // overridden by tests that need to check behaviour in the event of an application failure
+    // overridden by tests that need to check behaviour in the event of an application failure without tripping assertions
     protected boolean applicationMayFail() {
         return false;
     }

+ 9 - 4
server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java

@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -263,7 +264,9 @@ public class IndicesStore implements ClusterStateListener, Closeable {
                 return;
             }
 
-            clusterService.getClusterApplierService().runOnApplierThread("indices_store ([" + shardId + "] active fully on other nodes)",
+            clusterService.getClusterApplierService().runOnApplierThread(
+                "indices_store ([" + shardId + "] active fully on other nodes)",
+                Priority.HIGH,
                 currentState -> {
                     if (clusterStateVersion != currentState.getVersion()) {
                         logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before " +
@@ -276,9 +279,11 @@ public class IndicesStore implements ClusterStateListener, Closeable {
                         logger.debug(() -> new ParameterizedMessage("{} failed to delete unallocated shard, ignoring", shardId), ex);
                     }
                 },
-                (source, e) -> logger.error(() -> new ParameterizedMessage("{} unexpected error during deletion of unallocated shard",
-                    shardId), e)
-            );
+                e -> logger.error(
+                    () -> new ParameterizedMessage(
+                        "{} unexpected error during deletion of unallocated shard",
+                        shardId),
+                    e));
         }
 
     }

+ 2 - 2
server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java

@@ -141,12 +141,12 @@ public class InternalClusterInfoServiceSchedulingTests extends ESTestCase {
         return new ClusterApplier.ClusterApplyListener() {
 
             @Override
-            public void onSuccess(String source) {
+            public void onSuccess() {
                 assertTrue(flag.compareAndSet(false, true));
             }
 
             @Override
-            public void onFailure(String source, Exception e) {
+            public void onFailure(Exception e) {
                 throw new AssertionError("unexpected", e);
             }
         };

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/coordination/NoOpClusterApplier.java

@@ -20,6 +20,6 @@ public class NoOpClusterApplier implements ClusterApplier {
 
     @Override
     public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
-        listener.onSuccess(source);
+        listener.onSuccess();
     }
 }

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java

@@ -127,7 +127,7 @@ public class ClusterStateHealthTests extends ESTestCase {
         clusterService.getClusterApplierService().onNewClusterState("restore master",
             () -> ClusterState.builder(currentState)
                 .nodes(DiscoveryNodes.builder(currentState.nodes()).masterNodeId(currentState.nodes().getLocalNodeId())).build(),
-            (source, e) -> {});
+            e -> {});
 
         logger.info("--> waiting for listener to be called and cluster state being blocked");
         listenerCalled.await();

+ 8 - 5
server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java

@@ -204,11 +204,14 @@ public class BatchedRerouteServiceTests extends ESTestCase {
             }
 
             if (rarely()) {
-                clusterService.getClusterApplierService().onNewClusterState("simulated", () -> {
-                    ClusterState state = clusterService.state();
-                    return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes())
-                        .masterNodeId(randomBoolean() ? null : state.nodes().getLocalNodeId())).build();
-                }, (source, e) -> { });
+                clusterService.getClusterApplierService().onNewClusterState(
+                    "simulated",
+                    () -> {
+                        ClusterState state = clusterService.state();
+                        return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes())
+                            .masterNodeId(randomBoolean() ? null : state.nodes().getLocalNodeId())).build();
+                    },
+                    e -> { });
             }
         }
 

+ 124 - 114
server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.cluster.service;
 
+
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -23,26 +24,25 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.MockLogAppender;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -55,49 +55,64 @@ import static org.hamcrest.Matchers.is;
 
 public class ClusterApplierServiceTests extends ESTestCase {
 
-    private static ThreadPool threadPool;
-    private TimedClusterApplierService clusterApplierService;
-
-    @BeforeClass
-    public static void createThreadPool() {
-        threadPool = new TestThreadPool(ClusterApplierServiceTests.class.getName());
-    }
-
-    @AfterClass
-    public static void stopThreadPool() {
-        if (threadPool != null) {
-            threadPool.shutdownNow();
-            threadPool = null;
-        }
-    }
+    private ThreadPool threadPool;
+    private long currentTimeMillis;
+    private boolean allowClusterStateApplicationFailure = false;
+    private ClusterApplierService clusterApplierService;
+    private ClusterSettings clusterSettings;
 
     @Before
     public void setUp() throws Exception {
         super.setUp();
-        clusterApplierService = createTimedClusterService(true);
+        threadPool = new TestThreadPool(ClusterApplierServiceTests.class.getName()) {
+            @Override
+            public long relativeTimeInMillis() {
+                assertThat(Thread.currentThread().getName(), containsString(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME));
+                return currentTimeMillis;
+            }
+        };
+        clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        allowClusterStateApplicationFailure = false;
+        clusterApplierService = createClusterApplierService(true);
     }
 
     @After
     public void tearDown() throws Exception {
         clusterApplierService.close();
+        if (threadPool != null) {
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
+        }
         super.tearDown();
     }
 
-    private TimedClusterApplierService createTimedClusterService(boolean makeMaster) {
-        DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
-            emptySet(), Version.CURRENT);
-        TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService(Settings.builder().put("cluster.name",
-            "ClusterApplierServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
-            threadPool);
-        timedClusterApplierService.setNodeConnectionsService(createNoOpNodeConnectionsService());
-        timedClusterApplierService.setInitialState(ClusterState.builder(new ClusterName("ClusterApplierServiceTests"))
+    private ClusterApplierService createClusterApplierService(boolean makeMaster) {
+        final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
+        final ClusterApplierService clusterApplierService = new ClusterApplierService(
+            "test_node",
+            Settings.builder().put("cluster.name", "ClusterApplierServiceTests").build(),
+            clusterSettings,
+            threadPool
+        ) {
+            @Override
+            protected boolean applicationMayFail() {
+                return allowClusterStateApplicationFailure;
+            }
+        };
+        clusterApplierService.setNodeConnectionsService(createNoOpNodeConnectionsService());
+        clusterApplierService.setInitialState(ClusterState.builder(new ClusterName("ClusterApplierServiceTests"))
             .nodes(DiscoveryNodes.builder()
                 .add(localNode)
                 .localNodeId(localNode.getId())
                 .masterNodeId(makeMaster ? localNode.getId() : null))
             .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build());
-        timedClusterApplierService.start();
-        return timedClusterApplierService;
+        clusterApplierService.start();
+        return clusterApplierService;
+    }
+
+    private void advanceTime(long millis) {
+        // time is only read/written on applier thread, so no synchronization is needed
+        assertThat(Thread.currentThread().getName(), containsString(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME));
+        currentTimeMillis += millis;
     }
 
     @TestLogging(value = "org.elasticsearch.cluster.service:TRACE", reason = "to ensure that we log cluster state events on TRACE level")
@@ -126,44 +141,54 @@ public class ClusterApplierServiceTests extends ESTestCase {
         Logger clusterLogger = LogManager.getLogger(ClusterApplierService.class);
         Loggers.addAppender(clusterLogger, mockAppender);
         try {
-            clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis();
-            clusterApplierService.runOnApplierThread("test1",
-                currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).millis(),
+            currentTimeMillis = randomLongBetween(0L, Long.MAX_VALUE / 2);
+            clusterApplierService.runOnApplierThread(
+                "test1",
+                Priority.HIGH,
+                currentState -> advanceTime(TimeValue.timeValueSeconds(1).millis()),
                 new ClusterApplyListener() {
                     @Override
-                    public void onSuccess(String source) { }
+                    public void onSuccess() {
+                    }
 
                     @Override
-                    public void onFailure(String source, Exception e) {
+                    public void onFailure(Exception e) {
                         fail();
                     }
-            });
-            clusterApplierService.runOnApplierThread("test2",
+                }
+            );
+            clusterApplierService.runOnApplierThread(
+                "test2",
+                Priority.HIGH,
                 currentState -> {
-                    clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).millis();
+                    advanceTime(TimeValue.timeValueSeconds(2).millis());
                     throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
                 },
                 new ClusterApplyListener() {
                     @Override
-                    public void onSuccess(String source) {
+                    public void onSuccess() {
                         fail();
                     }
 
                     @Override
-                    public void onFailure(String source, Exception e) { }
-                });
+                    public void onFailure(Exception e) { }
+                }
+            );
             // Additional update task to make sure all previous logging made it to the loggerName
-            clusterApplierService.runOnApplierThread("test3",
+            clusterApplierService.runOnApplierThread(
+                "test3",
+                Priority.HIGH,
                 currentState -> {},
                 new ClusterApplyListener() {
                     @Override
-                    public void onSuccess(String source) { }
+                    public void onSuccess() { }
 
                     @Override
-                    public void onFailure(String source, Exception e) {
+                    public void onFailure(Exception e) {
                         fail();
                     }
-                });
+                }
+            );
             assertBusy(mockAppender::assertAllExpectationsMatched);
         } finally {
             Loggers.removeAppender(clusterLogger, mockAppender);
@@ -201,66 +226,78 @@ public class ClusterApplierServiceTests extends ESTestCase {
         try {
             final CountDownLatch latch = new CountDownLatch(4);
             final CountDownLatch processedFirstTask = new CountDownLatch(1);
-            clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis();
-            clusterApplierService.runOnApplierThread("test1",
-                currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).millis(),
+            currentTimeMillis = randomLongBetween(0L, Long.MAX_VALUE / 2);
+            clusterApplierService.runOnApplierThread(
+                "test1",
+                Priority.HIGH,
+                currentState -> advanceTime(TimeValue.timeValueSeconds(1).millis()),
                 new ClusterApplyListener() {
                     @Override
-                    public void onSuccess(String source) {
+                    public void onSuccess() {
                         latch.countDown();
                         processedFirstTask.countDown();
                     }
 
                     @Override
-                    public void onFailure(String source, Exception e) {
+                    public void onFailure(Exception e) {
                         fail();
                     }
-                });
+                }
+            );
             processedFirstTask.await();
-            clusterApplierService.runOnApplierThread("test2",
+            clusterApplierService.runOnApplierThread(
+                "test2",
+                Priority.HIGH,
                 currentState -> {
-                    clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).millis();
+                    advanceTime(TimeValue.timeValueSeconds(32).millis());
                     throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
                 },
                 new ClusterApplyListener() {
                     @Override
-                    public void onSuccess(String source) {
+                    public void onSuccess() {
                         fail();
                     }
 
                     @Override
-                    public void onFailure(String source, Exception e) {
+                    public void onFailure(Exception e) {
                         latch.countDown();
                     }
-                });
-            clusterApplierService.runOnApplierThread("test3",
-                currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).millis(),
+                }
+            );
+            clusterApplierService.runOnApplierThread(
+                "test3",
+                Priority.HIGH,
+                currentState -> advanceTime(TimeValue.timeValueSeconds(34).millis()),
                 new ClusterApplyListener() {
                     @Override
-                    public void onSuccess(String source) {
+                    public void onSuccess() {
                         latch.countDown();
                     }
 
                     @Override
-                    public void onFailure(String source, Exception e) {
+                    public void onFailure(Exception e) {
                         fail();
                     }
-                });
+                }
+            );
             // Additional update task to make sure all previous logging made it to the loggerName
             // We don't check logging for this on since there is no guarantee that it will occur before our check
-            clusterApplierService.runOnApplierThread("test4",
+            clusterApplierService.runOnApplierThread(
+                "test4",
+                Priority.HIGH,
                 currentState -> {},
                 new ClusterApplyListener() {
                     @Override
-                    public void onSuccess(String source) {
+                    public void onSuccess() {
                         latch.countDown();
                     }
 
                     @Override
-                    public void onFailure(String source, Exception e) {
+                    public void onFailure(Exception e) {
                         fail();
                     }
-                });
+                }
+            );
             latch.await();
         } finally {
             Loggers.removeAppender(clusterLogger, mockAppender);
@@ -270,10 +307,10 @@ public class ClusterApplierServiceTests extends ESTestCase {
     }
 
     public void testLocalNodeMasterListenerCallbacks() {
-        TimedClusterApplierService timedClusterApplierService = createTimedClusterService(false);
+        ClusterApplierService clusterApplierService = createClusterApplierService(false);
 
         AtomicBoolean isMaster = new AtomicBoolean();
-        timedClusterApplierService.addLocalNodeMasterListener(new LocalNodeMasterListener() {
+        clusterApplierService.addLocalNodeMasterListener(new LocalNodeMasterListener() {
             @Override
             public void onMaster() {
                 isMaster.set(true);
@@ -285,25 +322,25 @@ public class ClusterApplierServiceTests extends ESTestCase {
             }
         });
 
-        ClusterState state = timedClusterApplierService.state();
+        ClusterState state = clusterApplierService.state();
         DiscoveryNodes nodes = state.nodes();
         DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId());
         state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build();
-        setState(timedClusterApplierService, state);
+        setState(clusterApplierService, state);
         assertThat(isMaster.get(), is(true));
 
         nodes = state.nodes();
         nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(null);
         state = ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(NoMasterBlockService.NO_MASTER_BLOCK_WRITES))
             .nodes(nodesBuilder).build();
-        setState(timedClusterApplierService, state);
+        setState(clusterApplierService, state);
         assertThat(isMaster.get(), is(false));
         nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId());
         state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build();
-        setState(timedClusterApplierService, state);
+        setState(clusterApplierService, state);
         assertThat(isMaster.get(), is(true));
 
-        timedClusterApplierService.close();
+        clusterApplierService.close();
     }
 
     public void testClusterStateApplierCantSampleClusterState() throws InterruptedException {
@@ -326,12 +363,12 @@ public class ClusterApplierServiceTests extends ESTestCase {
             new ClusterApplyListener() {
 
                 @Override
-                public void onSuccess(String source) {
+                public void onSuccess() {
                     latch.countDown();
                 }
 
                 @Override
-                public void onFailure(String source, Exception e) {
+                public void onFailure(Exception e) {
                     error.compareAndSet(null, e);
                 }
             }
@@ -347,20 +384,20 @@ public class ClusterApplierServiceTests extends ESTestCase {
         clusterApplierService.addStateApplier(event -> {
             throw new RuntimeException("dummy exception");
         });
-        clusterApplierService.allowClusterStateApplicationFailure();
+        allowClusterStateApplicationFailure = true;
 
         CountDownLatch latch = new CountDownLatch(1);
         clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
             new ClusterApplyListener() {
 
                 @Override
-                public void onSuccess(String source) {
+                public void onSuccess() {
                     latch.countDown();
                     fail("should not be called");
                 }
 
                 @Override
-                public void onFailure(String source, Exception e) {
+                public void onFailure(Exception e) {
                     assertTrue(error.compareAndSet(null, e));
                     latch.countDown();
                 }
@@ -374,9 +411,8 @@ public class ClusterApplierServiceTests extends ESTestCase {
 
     public void testClusterStateApplierBubblesUpExceptionsInSettingsApplier() throws InterruptedException {
         AtomicReference<Throwable> error = new AtomicReference<>();
-        clusterApplierService.clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
-            v -> {});
-        clusterApplierService.allowClusterStateApplicationFailure();
+        clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, v -> {});
+        allowClusterStateApplicationFailure = true;
 
         CountDownLatch latch = new CountDownLatch(1);
         clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state())
@@ -388,13 +424,13 @@ public class ClusterApplierServiceTests extends ESTestCase {
             new ClusterApplyListener() {
 
                 @Override
-                public void onSuccess(String source) {
+                public void onSuccess() {
                     latch.countDown();
                     fail("should not be called");
                 }
 
                 @Override
-                public void onFailure(String source, Exception e) {
+                public void onFailure(Exception e) {
                     assertTrue(error.compareAndSet(null, e));
                     latch.countDown();
                 }
@@ -419,12 +455,12 @@ public class ClusterApplierServiceTests extends ESTestCase {
             new ClusterApplyListener() {
 
                 @Override
-                public void onSuccess(String source) {
+                public void onSuccess() {
                     latch.countDown();
                 }
 
                 @Override
-                public void onFailure(String source, Exception e) {
+                public void onFailure(Exception e) {
                     error.compareAndSet(null, e);
                 }
             }
@@ -468,12 +504,12 @@ public class ClusterApplierServiceTests extends ESTestCase {
         clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
             new ClusterApplyListener() {
                 @Override
-                public void onSuccess(String source) {
+                public void onSuccess() {
                     latch.countDown();
                 }
 
                 @Override
-                public void onFailure(String source, Exception e) {
+                public void onFailure(Exception e) {
                     error.compareAndSet(null, e);
                 }
         });
@@ -505,7 +541,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
             }, new ClusterApplyListener() {
 
                 @Override
-                public void onSuccess(String source) {
+                public void onSuccess() {
                     assertFalse(threadPool.getThreadContext().isSystemContext());
                     assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders());
                     assertEquals(expectedResponseHeaders, threadPool.getThreadContext().getResponseHeaders());
@@ -513,7 +549,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
                 }
 
                 @Override
-                public void onFailure(String source, Exception e) {
+                public void onFailure(Exception e) {
                     assertFalse(threadPool.getThreadContext().isSystemContext());
                     assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders());
                     assertEquals(expectedResponseHeaders, threadPool.getThreadContext().getResponseHeaders());
@@ -525,30 +561,4 @@ public class ClusterApplierServiceTests extends ESTestCase {
         latch.await();
     }
 
-    static class TimedClusterApplierService extends ClusterApplierService {
-
-        final ClusterSettings clusterSettings;
-        volatile Long currentTimeOverride = null;
-        boolean applicationMayFail;
-
-        TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
-            super("test_node", settings, clusterSettings, threadPool);
-            this.clusterSettings = clusterSettings;
-        }
-
-        @Override
-        protected long currentTimeInMillis() {
-            return Objects.requireNonNullElseGet(currentTimeOverride, super::currentTimeInMillis);
-        }
-
-        @Override
-        protected boolean applicationMayFail() {
-            return this.applicationMayFail;
-        }
-
-        void allowClusterStateApplicationFailure() {
-            this.applicationMayFail = true;
-        }
-    }
-
 }

+ 3 - 3
server/src/test/java/org/elasticsearch/snapshots/InternalSnapshotsInfoServiceTests.java

@@ -386,12 +386,12 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase {
             future -> clusterService.getClusterApplierService()
                 .onNewClusterState(reason, () -> applier.apply(clusterService.state()), new ClusterApplier.ClusterApplyListener() {
                     @Override
-                    public void onSuccess(String source) {
-                        future.onResponse(source);
+                    public void onSuccess() {
+                        future.onResponse(null);
                     }
 
                     @Override
-                    public void onFailure(String source, Exception e) {
+                    public void onFailure(Exception e) {
                         future.onFailure(e);
                     }
                 })

+ 3 - 1
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -1322,6 +1322,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
     static class DisruptableClusterApplierService extends ClusterApplierService {
         private final String nodeName;
         private final DeterministicTaskQueue deterministicTaskQueue;
+        private final ThreadPool threadPool;
         ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
         private boolean applicationMayFail;
 
@@ -1330,6 +1331,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
             super(nodeName, settings, clusterSettings, threadPool);
             this.nodeName = nodeName;
             this.deterministicTaskQueue = deterministicTaskQueue;
+            this.threadPool = threadPool;
             addStateApplier(event -> {
                 switch (clusterStateApplyResponse) {
                     case SUCCEED:
@@ -1355,7 +1357,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
             if (clusterStateApplyResponse == ClusterStateApplyResponse.HANG) {
                 if (randomBoolean()) {
                     // apply cluster state, but don't notify listener
-                    super.onNewClusterState(source, clusterStateSupplier, (source1, e) -> {
+                    super.onNewClusterState(source, clusterStateSupplier, e -> {
                         // ignore result
                     });
                 }

+ 4 - 4
test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

@@ -50,12 +50,12 @@ public class ClusterServiceUtils {
         executor.onNewClusterState("test setting state",
             () -> ClusterState.builder(clusterState).version(clusterState.version() + 1).build(), new ClusterApplyListener() {
                 @Override
-                public void onSuccess(String source) {
+                public void onSuccess() {
                     latch.countDown();
                 }
 
                 @Override
-                public void onFailure(String source, Exception e) {
+                public void onFailure(Exception e) {
                     exception.set(e);
                     latch.countDown();
                 }
@@ -150,12 +150,12 @@ public class ClusterServiceUtils {
                 clusterStatePublicationEvent::getNewState,
                 new ClusterApplyListener() {
                     @Override
-                    public void onSuccess(String source) {
+                    public void onSuccess() {
                         publishListener.onResponse(null);
                     }
 
                     @Override
-                    public void onFailure(String source, Exception e) {
+                    public void onFailure(Exception e) {
                         publishListener.onFailure(e);
                     }
                 });

+ 6 - 3
test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java

@@ -41,7 +41,9 @@ public class BlockClusterStateProcessing extends SingleNodeDisruption {
         boolean success = disruptionLatch.compareAndSet(null, new CountDownLatch(1));
         assert success : "startDisrupting called without waiting on stopDisrupting to complete";
         final CountDownLatch started = new CountDownLatch(1);
-        clusterService.getClusterApplierService().runOnApplierThread("service_disruption_block",
+        clusterService.getClusterApplierService().runOnApplierThread(
+            "service_disruption_block",
+            Priority.IMMEDIATE,
             currentState -> {
                 started.countDown();
                 CountDownLatch latch = disruptionLatch.get();
@@ -52,8 +54,9 @@ public class BlockClusterStateProcessing extends SingleNodeDisruption {
                         Throwables.rethrow(e);
                     }
                 }
-            }, (source, e) -> logger.error("unexpected error during disruption", e),
-            Priority.IMMEDIATE);
+            },
+            e -> logger.error("unexpected error during disruption", e)
+        );
         try {
             started.await();
         } catch (InterruptedException e) {

+ 6 - 3
test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java

@@ -90,7 +90,9 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
             return false;
         }
         final AtomicBoolean stopped = new AtomicBoolean(false);
-        clusterService.getClusterApplierService().runOnApplierThread("service_disruption_delay",
+        clusterService.getClusterApplierService().runOnApplierThread(
+            "service_disruption_delay",
+            Priority.IMMEDIATE,
             currentState -> {
                 try {
                     long count = duration.millis() / 200;
@@ -105,8 +107,9 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
                 } catch (InterruptedException e) {
                     ExceptionsHelper.reThrowIfNotNull(e);
                 }
-            }, (source, e) -> countDownLatch.countDown(),
-            Priority.IMMEDIATE);
+            },
+            e -> countDownLatch.countDown()
+        );
         try {
             countDownLatch.await();
         } catch (InterruptedException e) {