Bladeren bron

More logging for slow cluster state application (#45007)

Today the lag detector may remove nodes from the cluster if they fail to apply
a cluster state within a reasonable timeframe, but it is rather unclear from
the default logging that this has occurred and there is very little extra
information beyond the fact that the removed node was lagging. Moreover the
only forewarning that the lag detector might be invoked is a message indicating
that cluster state publication took unreasonably long, which does not contain
enough information to investigate the problem further.

This commit adds a good deal more detail to make the issues of slow nodes more
prominent:

- after 10 seconds (by default) we log an INFO message indicating that a
  publication is still waiting for responses from some nodes, including the
  identities of the problematic nodes.

- when the publication times out after 30 seconds (by default) we log a WARN
  message identifying the nodes that are still pending.

- the lag detector logs a more detailed warning when a fatally-lagging node is
  detected.

- if applying a cluster state takes too long then the cluster applier service
  logs a breakdown of all the tasks it ran as part of that process.
David Turner 6 jaren geleden
bovenliggende
commit
7776f755ee

+ 6 - 0
docs/reference/modules/discovery/discovery-settings.asciidoc

@@ -176,6 +176,12 @@ or may become unstable or intolerant of certain failures.
     time. The default value is `10`. See
     <<modules-discovery-adding-removing-nodes>>.
 
+`cluster.publish.info_timeout`::
+
+    Sets how long the master node waits for each cluster state update to be
+    completely published to all nodes before logging a message indicating that
+    some nodes are responding slowly. The default value is `10s`.
+
 `cluster.publish.timeout`::
 
     Sets how long the master node waits for each cluster state update to be

+ 33 - 5
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.cluster.coordination;
 
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
@@ -94,6 +95,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
 
     private static final Logger logger = LogManager.getLogger(Coordinator.class);
 
+    // the timeout before emitting an info log about a slow-running publication
+    public static final Setting<TimeValue> PUBLISH_INFO_TIMEOUT_SETTING =
+        Setting.timeSetting("cluster.publish.info_timeout",
+            TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
+
     // the timeout for the publication of each value
     public static final Setting<TimeValue> PUBLISH_TIMEOUT_SETTING =
         Setting.timeSetting("cluster.publish.timeout",
@@ -121,6 +127,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
     private final ElectionSchedulerFactory electionSchedulerFactory;
     private final SeedHostsResolver configuredHostsResolver;
     private final TimeValue publishTimeout;
+    private final TimeValue publishInfoTimeout;
     private final PublicationTransportHandler publicationHandler;
     private final LeaderChecker leaderChecker;
     private final FollowersChecker followersChecker;
@@ -167,6 +174,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
         this.lastJoin = Optional.empty();
         this.joinAccumulator = new InitialJoinAccumulator();
         this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
+        this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
         this.random = random;
         this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
         this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy);
@@ -1208,7 +1216,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
         private final AckListener ackListener;
         private final ActionListener<Void> publishListener;
         private final PublicationTransportHandler.PublicationContext publicationContext;
-        private final Scheduler.ScheduledCancellable scheduledCancellable;
+        private final Scheduler.ScheduledCancellable timeoutHandler;
+        private final Scheduler.Cancellable infoTimeoutHandler;
 
         // We may not have accepted our own state before receiving a join from another node, causing its join to be rejected (we cannot
         // safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end.
@@ -1249,7 +1258,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
             this.localNodeAckEvent = localNodeAckEvent;
             this.ackListener = ackListener;
             this.publishListener = publishListener;
-            this.scheduledCancellable = transportService.getThreadPool().schedule(new Runnable() {
+
+            this.timeoutHandler = transportService.getThreadPool().schedule(new Runnable() {
                 @Override
                 public void run() {
                     synchronized (mutex) {
@@ -1262,6 +1272,20 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                     return "scheduled timeout for " + CoordinatorPublication.this;
                 }
             }, publishTimeout, Names.GENERIC);
+
+            this.infoTimeoutHandler = transportService.getThreadPool().schedule(new Runnable() {
+                @Override
+                public void run() {
+                    synchronized (mutex) {
+                        logIncompleteNodes(Level.INFO);
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return "scheduled timeout for reporting on " + CoordinatorPublication.this;
+                }
+            }, publishInfoTimeout, Names.GENERIC);
         }
 
         private void removePublicationAndPossiblyBecomeCandidate(String reason) {
@@ -1303,7 +1327,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                                 synchronized (mutex) {
                                     removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
                                 }
-                                scheduledCancellable.cancel();
+                                timeoutHandler.cancel();
+                                infoTimeoutHandler.cancel();
                                 ackListener.onNodeAck(getLocalNode(), e);
                                 publishListener.onFailure(e);
                             }
@@ -1348,8 +1373,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                                         }
                                     }
                                     lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
+                                    logIncompleteNodes(Level.WARN);
                                 }
-                                scheduledCancellable.cancel();
+                                timeoutHandler.cancel();
+                                infoTimeoutHandler.cancel();
                                 ackListener.onNodeAck(getLocalNode(), null);
                                 publishListener.onResponse(null);
                             }
@@ -1360,7 +1387,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                 public void onFailure(Exception e) {
                     assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
                     removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)");
-                    scheduledCancellable.cancel();
+                    timeoutHandler.cancel();
+                    infoTimeoutHandler.cancel();
 
                     final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e);
                     ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.

+ 3 - 1
server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java

@@ -162,7 +162,9 @@ public class LagDetector {
                 return;
             }
 
-            logger.debug("{}, detected lag at version {}, node has only applied version {}", this, version, appliedVersion);
+            logger.warn(
+                "node [{}] is lagging at cluster state version [{}], although publication of cluster state version [{}] completed [{}] ago",
+                discoveryNode, appliedVersion, version, clusterStateApplicationTimeout);
             onLagDetected.accept(discoveryNode);
         }
     }

+ 15 - 0
server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.cluster.coordination;
 
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
@@ -195,6 +196,16 @@ public abstract class Publication {
             ", version=" + publishRequest.getAcceptedState().version() + '}';
     }
 
+    void logIncompleteNodes(Level level) {
+        final String message = publicationTargets.stream().filter(PublicationTarget::isActive).map(publicationTarget ->
+            publicationTarget.getDiscoveryNode() + " [" + publicationTarget.getState() + "]").collect(Collectors.joining(", "));
+        if (message.isEmpty() == false) {
+            final TimeValue elapsedTime = TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime);
+            logger.log(level, "after [{}] publication of cluster state version [{}] is still waiting for {}", elapsedTime,
+                publishRequest.getAcceptedState().version(), message);
+        }
+    }
+
     enum PublicationTargetState {
         NOT_STARTED,
         FAILED,
@@ -213,6 +224,10 @@ public abstract class Publication {
             this.discoveryNode = discoveryNode;
         }
 
+        PublicationTargetState getState() {
+            return state;
+        }
+
         @Override
         public String toString() {
             return "PublicationTarget{" +

+ 33 - 17
server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

@@ -35,7 +35,9 @@ import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.StopWatch;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
@@ -47,6 +49,7 @@ import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -61,6 +64,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.cluster.service.ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING;
@@ -389,15 +393,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
         final ClusterState previousClusterState = state.get();
 
         long startTimeMS = currentTimeInMillis();
+        final StopWatch stopWatch = new StopWatch();
         final ClusterState newClusterState;
         try {
-            newClusterState = task.apply(previousClusterState);
+            try (Releasable ignored = stopWatch.timing("running task [" + task.source + ']')) {
+                newClusterState = task.apply(previousClusterState);
+            }
         } catch (Exception e) {
             TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
             logger.trace(() -> new ParameterizedMessage(
                 "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
                 executionTime, previousClusterState.version(), task.source, previousClusterState), e);
-            warnAboutSlowTaskIfNeeded(executionTime, task.source);
+            warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
             task.listener.onFailure(task.source, e);
             return;
         }
@@ -405,7 +412,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
         if (previousClusterState == newClusterState) {
             TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
             logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
-            warnAboutSlowTaskIfNeeded(executionTime, task.source);
+            warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
             task.listener.onSuccess(task.source);
         } else {
             if (logger.isTraceEnabled()) {
@@ -415,12 +422,12 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
                 logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
             }
             try {
-                applyChanges(task, previousClusterState, newClusterState);
+                applyChanges(task, previousClusterState, newClusterState, stopWatch);
                 TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
                 logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
                     executionTime, newClusterState.version(),
                     newClusterState.stateUUID());
-                warnAboutSlowTaskIfNeeded(executionTime, task.source);
+                warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
                 task.listener.onSuccess(task.source);
             } catch (Exception e) {
                 TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
@@ -438,7 +445,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
         }
     }
 
-    private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState) {
+    private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState, StopWatch stopWatch) {
         ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(task.source, newClusterState, previousClusterState);
         // new cluster state, notify all listeners
         final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
@@ -451,24 +458,28 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
         }
 
         logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version());
-        connectToNodesAndWait(newClusterState);
+        try (Releasable ignored = stopWatch.timing("connecting to new nodes")) {
+            connectToNodesAndWait(newClusterState);
+        }
 
         // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
         if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
             logger.debug("applying settings from cluster state with version {}", newClusterState.version());
             final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
-            clusterSettings.applySettings(incomingSettings);
+            try (Releasable ignored = stopWatch.timing("applying settings")) {
+                clusterSettings.applySettings(incomingSettings);
+            }
         }
 
         logger.debug("apply cluster state with version {}", newClusterState.version());
-        callClusterStateAppliers(clusterChangedEvent);
+        callClusterStateAppliers(clusterChangedEvent, stopWatch);
 
         nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
 
         logger.debug("set locally applied cluster state to version {}", newClusterState.version());
         state.set(newClusterState);
 
-        callClusterStateListeners(clusterChangedEvent);
+        callClusterStateListeners(clusterChangedEvent, stopWatch);
     }
 
     protected void connectToNodesAndWait(ClusterState newClusterState) {
@@ -483,18 +494,22 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
         }
     }
 
-    private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
+    private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
         clusterStateAppliers.forEach(applier -> {
             logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
-            applier.applyClusterState(clusterChangedEvent);
+            try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) {
+                applier.applyClusterState(clusterChangedEvent);
+            }
         });
     }
 
-    private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent) {
+    private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
         Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> {
             try {
                 logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
-                listener.clusterChanged(clusterChangedEvent);
+                try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
+                    listener.clusterChanged(clusterChangedEvent);
+                }
             } catch (Exception ex) {
                 logger.warn("failed to notify ClusterStateListener", ex);
             }
@@ -532,10 +547,11 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
         }
     }
 
-    protected void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) {
+    private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source, StopWatch stopWatch) {
         if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
-            logger.warn("cluster state applier task [{}] took [{}] which is above the warn threshold of {}", source, executionTime,
-                slowTaskLoggingThreshold);
+            logger.warn("cluster state applier task [{}] took [{}] which is above the warn threshold of [{}]: {}", source, executionTime,
+                slowTaskLoggingThreshold, Arrays.stream(stopWatch.taskInfo())
+                    .map(ti -> '[' + ti.getTaskName() + "] took [" + ti.getTime().millis() + "ms]").collect(Collectors.joining(", ")));
         }
     }
 

+ 1 - 1
server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

@@ -569,7 +569,7 @@ public class MasterService extends AbstractLifecycleComponent {
 
     protected void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) {
         if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
-            logger.warn("cluster state update task [{}] took [{}] which is above the warn threshold of {}", source, executionTime,
+            logger.warn("cluster state update task [{}] took [{}] which is above the warn threshold of [{}]", source, executionTime,
                 slowTaskLoggingThreshold);
         }
     }

+ 26 - 65
server/src/main/java/org/elasticsearch/common/StopWatch.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.common;
 
+import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.unit.TimeValue;
 
 import java.text.NumberFormat;
@@ -51,8 +52,6 @@ public class StopWatch {
      */
     private final String id;
 
-    private boolean keepTaskList = true;
-
     private final List<TaskInfo> taskList = new LinkedList<>();
 
     /**
@@ -72,8 +71,6 @@ public class StopWatch {
 
     private TaskInfo lastTaskInfo;
 
-    private int taskCount;
-
     /**
      * Total running time
      */
@@ -98,16 +95,6 @@ public class StopWatch {
         this.id = id;
     }
 
-    /**
-     * Determine whether the TaskInfo array is built over time. Set this to
-     * "false" when using a StopWatch for millions of intervals, or the task
-     * info structure will consume excessive memory. Default is "true".
-     */
-    public StopWatch keepTaskList(boolean keepTaskList) {
-        this.keepTaskList = keepTaskList;
-        return this;
-    }
-
     /**
      * Start an unnamed task. The results are undefined if {@link #stop()}
      * or timing methods are called without invoking this method.
@@ -138,7 +125,7 @@ public class StopWatch {
     /**
      * Stop the current task. The results are undefined if timing
      * methods are called without invoking at least one pair
-     * {@link #start()} / {@link #stop()} methods.
+     * {@link #start()} / {@code #stop()} methods.
      *
      * @see #start()
      */
@@ -149,15 +136,17 @@ public class StopWatch {
         long lastTimeNS = System.nanoTime() - this.startTimeNS;
         this.totalTimeNS += lastTimeNS;
         this.lastTaskInfo = new TaskInfo(this.currentTaskName, TimeValue.nsecToMSec(lastTimeNS));
-        if (this.keepTaskList) {
-            this.taskList.add(lastTaskInfo);
-        }
-        ++this.taskCount;
+        this.taskList.add(lastTaskInfo);
         this.running = false;
         this.currentTaskName = null;
         return this;
     }
 
+    public Releasable timing(String taskName) {
+        start(taskName);
+        return this::stop;
+    }
+
     /**
      * Return whether the stop watch is currently running.
      */
@@ -175,16 +164,6 @@ public class StopWatch {
         return this.lastTaskInfo.getTime();
     }
 
-    /**
-     * Return the name of the last task.
-     */
-    public String lastTaskName() throws IllegalStateException {
-        if (this.lastTaskInfo == null) {
-            throw new IllegalStateException("No tests run: can't get last interval");
-        }
-        return this.lastTaskInfo.getTaskName();
-    }
-
     /**
      * Return the total time for all tasks.
      */
@@ -192,21 +171,11 @@ public class StopWatch {
         return new TimeValue(totalTimeNS, TimeUnit.NANOSECONDS);
     }
 
-    /**
-     * Return the number of tasks timed.
-     */
-    public int taskCount() {
-        return taskCount;
-    }
-
     /**
      * Return an array of the data for tasks performed.
      */
     public TaskInfo[] taskInfo() {
-        if (!this.keepTaskList) {
-            throw new UnsupportedOperationException("Task info is not being kept!");
-        }
-        return this.taskList.toArray(new TaskInfo[this.taskList.size()]);
+        return this.taskList.toArray(new TaskInfo[0]);
     }
 
     /**
@@ -223,23 +192,19 @@ public class StopWatch {
     public String prettyPrint() {
         StringBuilder sb = new StringBuilder(shortSummary());
         sb.append('\n');
-        if (!this.keepTaskList) {
-            sb.append("No task info kept");
-        } else {
-            sb.append("-----------------------------------------\n");
-            sb.append("ms     %     Task name\n");
-            sb.append("-----------------------------------------\n");
-            NumberFormat nf = NumberFormat.getNumberInstance(Locale.ROOT);
-            nf.setMinimumIntegerDigits(5);
-            nf.setGroupingUsed(false);
-            NumberFormat pf = NumberFormat.getPercentInstance(Locale.ROOT);
-            pf.setMinimumIntegerDigits(3);
-            pf.setGroupingUsed(false);
-            for (TaskInfo task : taskInfo()) {
-                sb.append(nf.format(task.getTime().millis())).append("  ");
-                sb.append(pf.format(task.getTime().secondsFrac() / totalTime().secondsFrac())).append("  ");
-                sb.append(task.getTaskName()).append("\n");
-            }
+        sb.append("-----------------------------------------\n");
+        sb.append("ms     %     Task name\n");
+        sb.append("-----------------------------------------\n");
+        NumberFormat nf = NumberFormat.getNumberInstance(Locale.ROOT);
+        nf.setMinimumIntegerDigits(5);
+        nf.setGroupingUsed(false);
+        NumberFormat pf = NumberFormat.getPercentInstance(Locale.ROOT);
+        pf.setMinimumIntegerDigits(3);
+        pf.setGroupingUsed(false);
+        for (TaskInfo task : taskInfo()) {
+            sb.append(nf.format(task.getTime().millis())).append("  ");
+            sb.append(pf.format(task.getTime().secondsFrac() / totalTime().secondsFrac())).append("  ");
+            sb.append(task.getTaskName()).append("\n");
         }
         return sb.toString();
     }
@@ -251,14 +216,10 @@ public class StopWatch {
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder(shortSummary());
-        if (this.keepTaskList) {
-            for (TaskInfo task : taskInfo()) {
-                sb.append("; [").append(task.getTaskName()).append("] took ").append(task.getTime());
-                long percent = Math.round((100.0f * task.getTime().millis()) / totalTime().millis());
-                sb.append(" = ").append(percent).append("%");
-            }
-        } else {
-            sb.append("; no task info kept");
+        for (TaskInfo task : taskInfo()) {
+            sb.append("; [").append(task.getTaskName()).append("] took ").append(task.getTime());
+            long percent = Math.round((100.0f * task.getTime().millis()) / totalTime().millis());
+            sb.append(" = ").append(percent).append("%");
         }
         return sb.toString();
     }

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -426,6 +426,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
             ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING,
             ElectionSchedulerFactory.ELECTION_DURATION_SETTING,
             Coordinator.PUBLISH_TIMEOUT_SETTING,
+            Coordinator.PUBLISH_INFO_TIMEOUT_SETTING,
             JoinHelper.JOIN_TIMEOUT_SETTING,
             FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING,
             FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING,

+ 8 - 0
server/src/main/java/org/elasticsearch/threadpool/Scheduler.java

@@ -248,6 +248,14 @@ public interface Scheduler {
                 }
             }
         }
+
+        @Override
+        public String toString() {
+            return "ReschedulingRunnable{" +
+                "runnable=" + runnable +
+                ", interval=" + interval +
+                '}';
+        }
     }
 
     /**

+ 61 - 10
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

@@ -1191,7 +1191,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
         }
     }
 
-    public void testLogsWarningPeriodicallyIfClusterNotFormed() {
+    public void testLogsWarningPeriodicallyIfClusterNotFormed() throws IllegalAccessException {
         final long warningDelayMillis;
         final Settings settings;
         if (randomBoolean()) {
@@ -1219,16 +1219,10 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
                 "waiting for leader failure");
 
             for (int i = scaledRandomIntBetween(1, 10); i >= 0; i--) {
-                final MockLogAppender mockLogAppender;
+                final MockLogAppender mockLogAppender = new MockLogAppender();
                 try {
-                    mockLogAppender = new MockLogAppender();
-                } catch (IllegalAccessException e) {
-                    throw new AssertionError(e);
-                }
-
-                try {
-                    Loggers.addAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender);
                     mockLogAppender.start();
+                    Loggers.addAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender);
                     mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() {
                         final Set<DiscoveryNode> nodesLogged = new HashSet<>();
 
@@ -1259,13 +1253,70 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
                     cluster.runFor(warningDelayMillis + DEFAULT_DELAY_VARIABILITY, "waiting for warning to be emitted");
                     mockLogAppender.assertAllExpectationsMatched();
                 } finally {
-                    mockLogAppender.stop();
                     Loggers.removeAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender);
+                    mockLogAppender.stop();
                 }
             }
         }
     }
 
+    public void testLogsMessagesIfPublicationDelayed() throws IllegalAccessException {
+        try (Cluster cluster = new Cluster(between(3, 5))) {
+            cluster.runRandomly();
+            cluster.stabilise();
+            final ClusterNode brokenNode = cluster.getAnyNodeExcept(cluster.getAnyLeader());
+
+            final MockLogAppender mockLogAppender = new MockLogAppender();
+            try {
+                mockLogAppender.start();
+                Loggers.addAppender(LogManager.getLogger(Coordinator.CoordinatorPublication.class), mockLogAppender);
+                Loggers.addAppender(LogManager.getLogger(LagDetector.class), mockLogAppender);
+
+                mockLogAppender.addExpectation(new MockLogAppender.SeenEventExpectation("publication info message",
+                    Coordinator.CoordinatorPublication.class.getCanonicalName(), Level.INFO,
+                    "after [*] publication of cluster state version [*] is still waiting for " + brokenNode.getLocalNode() + " ["
+                        + Publication.PublicationTargetState.SENT_PUBLISH_REQUEST + ']'));
+
+                mockLogAppender.addExpectation(new MockLogAppender.SeenEventExpectation("publication warning",
+                    Coordinator.CoordinatorPublication.class.getCanonicalName(), Level.WARN,
+                    "after [*] publication of cluster state version [*] is still waiting for " + brokenNode.getLocalNode() + " ["
+                        + Publication.PublicationTargetState.SENT_PUBLISH_REQUEST + ']'));
+
+                mockLogAppender.addExpectation(new MockLogAppender.SeenEventExpectation("lag warning",
+                    LagDetector.class.getCanonicalName(), Level.WARN,
+                    "node [" + brokenNode + "] is lagging at cluster state version [*], " +
+                        "although publication of cluster state version [*] completed [*] ago"));
+
+                // drop the publication messages to one node, but then restore connectivity so it remains in the cluster and does not fail
+                // health checks
+                brokenNode.blackhole();
+                cluster.deterministicTaskQueue.scheduleAt(
+                    cluster.deterministicTaskQueue.getCurrentTimeMillis() + DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
+                    new Runnable() {
+                        @Override
+                        public void run() {
+                            brokenNode.heal();
+                        }
+
+                        @Override
+                        public String toString() {
+                            return "healing " + brokenNode;
+                        }
+                    });
+                cluster.getAnyLeader().submitValue(randomLong());
+                cluster.runFor(defaultMillis(PUBLISH_TIMEOUT_SETTING) + 2 * DEFAULT_DELAY_VARIABILITY
+                        + defaultMillis(LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING),
+                    "waiting for messages to be emitted");
+
+                mockLogAppender.assertAllExpectationsMatched();
+            } finally {
+                Loggers.removeAppender(LogManager.getLogger(Coordinator.CoordinatorPublication.class), mockLogAppender);
+                Loggers.removeAppender(LogManager.getLogger(LagDetector.class), mockLogAppender);
+                mockLogAppender.stop();
+            }
+        }
+    }
+
     public void testReconfiguresToExcludeMasterIneligibleNodesInVotingConfig() {
         try (Cluster cluster = new Cluster(3)) {
             cluster.runRandomly();

+ 10 - 10
server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java

@@ -48,6 +48,7 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -61,8 +62,8 @@ import static org.hamcrest.Matchers.is;
 
 public class ClusterApplierServiceTests extends ESTestCase {
 
-    protected static ThreadPool threadPool;
-    protected TimedClusterApplierService clusterApplierService;
+    private static ThreadPool threadPool;
+    private TimedClusterApplierService clusterApplierService;
 
     @BeforeClass
     public static void createThreadPool() {
@@ -192,13 +193,15 @@ public class ClusterApplierServiceTests extends ESTestCase {
                         "test2",
                         ClusterApplierService.class.getCanonicalName(),
                         Level.WARN,
-                        "*cluster state applier task [test2] took [32s] which is above the warn threshold of *"));
+                        "*cluster state applier task [test2] took [32s] which is above the warn threshold of [*]: " +
+                            "[running task [test2]] took [*"));
         mockAppender.addExpectation(
                 new MockLogAppender.SeenEventExpectation(
                         "test4",
                         ClusterApplierService.class.getCanonicalName(),
                         Level.WARN,
-                        "*cluster state applier task [test3] took [34s] which is above the warn threshold of *"));
+                        "*cluster state applier task [test3] took [34s] which is above the warn threshold of [*]: " +
+                            "[running task [test3]] took [*"));
 
         Logger clusterLogger = LogManager.getLogger(ClusterApplierService.class);
         Loggers.addAppender(clusterLogger, mockAppender);
@@ -273,7 +276,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
         mockAppender.assertAllExpectationsMatched();
     }
 
-    public void testLocalNodeMasterListenerCallbacks() throws Exception {
+    public void testLocalNodeMasterListenerCallbacks() {
         TimedClusterApplierService timedClusterApplierService = createTimedClusterService(false);
 
         AtomicBoolean isMaster = new AtomicBoolean();
@@ -493,7 +496,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
     static class TimedClusterApplierService extends ClusterApplierService {
 
         final ClusterSettings clusterSettings;
-        public volatile Long currentTimeOverride = null;
+        volatile Long currentTimeOverride = null;
 
         TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
             super("test_node", settings, clusterSettings, threadPool);
@@ -502,10 +505,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
 
         @Override
         protected long currentTimeInMillis() {
-            if (currentTimeOverride != null) {
-                return currentTimeOverride;
-            }
-            return super.currentTimeInMillis();
+            return Objects.requireNonNullElseGet(currentTimeOverride, super::currentTimeInMillis);
         }
     }
 

+ 4 - 4
server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

@@ -652,25 +652,25 @@ public class MasterServiceTests extends ESTestCase {
                 "test1 shouldn't see because setting is too low",
                 MasterService.class.getCanonicalName(),
                 Level.WARN,
-                "*cluster state update task [test1] took [*] which is above the warn threshold of *"));
+                "*cluster state update task [test1] took [*] which is above the warn threshold of [*]"));
         mockAppender.addExpectation(
             new MockLogAppender.SeenEventExpectation(
                 "test2",
                 MasterService.class.getCanonicalName(),
                 Level.WARN,
-                "*cluster state update task [test2] took [32s] which is above the warn threshold of *"));
+                "*cluster state update task [test2] took [32s] which is above the warn threshold of [*]"));
         mockAppender.addExpectation(
             new MockLogAppender.SeenEventExpectation(
                 "test3",
                 MasterService.class.getCanonicalName(),
                 Level.WARN,
-                "*cluster state update task [test3] took [33s] which is above the warn threshold of *"));
+                "*cluster state update task [test3] took [33s] which is above the warn threshold of [*]"));
         mockAppender.addExpectation(
             new MockLogAppender.SeenEventExpectation(
                 "test4",
                 MasterService.class.getCanonicalName(),
                 Level.WARN,
-                "*cluster state update task [test4] took [34s] which is above the warn threshold of *"));
+                "*cluster state update task [test4] took [34s] which is above the warn threshold of [*]"));
 
         Logger clusterLogger = LogManager.getLogger(MasterService.class);
         Loggers.addAppender(clusterLogger, mockAppender);