Browse Source

Add timing stats to publication process (#76771)

This commit introduces into the node stats API various statistics to
track the time that the elected master spends in various phases of the
cluster state publication process.

Relates #76625
David Turner 4 years ago
parent
commit
4a17847b85
20 changed files with 1773 additions and 43 deletions
  1. 161 0
      docs/reference/cluster/nodes-stats.asciidoc
  2. 96 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/30_discovery.yml
  3. 126 1
      server/src/main/java/org/elasticsearch/cluster/ClusterStatePublicationEvent.java
  4. 31 6
      server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
  5. 4 0
      server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java
  6. 329 0
      server/src/main/java/org/elasticsearch/cluster/service/ClusterStateUpdateStats.java
  7. 124 18
      server/src/main/java/org/elasticsearch/cluster/service/MasterService.java
  8. 24 1
      server/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java
  9. 14 0
      server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
  10. 109 5
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java
  11. 245 1
      server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
  12. 2 0
      server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java
  13. 1 1
      server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java
  14. 457 0
      server/src/test/java/org/elasticsearch/cluster/service/ClusterStateUpdateStatsWireSerializationTests.java
  15. 13 2
      server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java
  16. 15 3
      test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java
  17. 2 2
      test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java
  18. 5 0
      test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java
  19. 13 3
      test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java
  20. 2 0
      test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java

+ 161 - 0
docs/reference/cluster/nodes-stats.asciidoc

@@ -2077,6 +2077,167 @@ Number of incompatible differences between published cluster states.
 (integer)
 Number of compatible differences between published cluster states.
 =======
+
+`cluster_state_update`::
+(object)
+Contains low-level statistics about how long various activities took during
+cluster state updates while the node was the elected master. Omitted if the
+node is not master-eligible. Every field whose name ends in `_time` within this
+object is also represented as a raw number of milliseconds in a field whose
+name ends in `_time_millis`. The human-readable fields with a `_time` suffix
+are only returned if requested with the `?human=true` query parameter.
++
+.Properties of `cluster_state_update`
+[%collapsible]
+=======
+`unchanged`::
+(object)
+Contains statistics about cluster state update attempts that did not change the
+cluster state.
++
+.Properties of `unchanged`
+[%collapsible]
+========
+`count`::
+(long)
+The number of cluster state update attempts that did not change the cluster
+state since the node started.
+
+`computation_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent computing no-op cluster state updates since
+the node started.
+
+`notification_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent notifying listeners of a no-op cluster
+state update since the node started.
+
+========
+
+`success`::
+(object)
+Contains statistics about cluster state update attempts that successfully
+changed the cluster state.
++
+.Properties of `success`
+[%collapsible]
+========
+`count`::
+(long)
+The number of cluster state update attempts that successfully changed the
+cluster state since the node started.
+
+`computation_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent computing cluster state updates that were
+ultimately successful since the node started.
+
+`publication_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent publishing cluster state updates which
+ultimately succeeded, which includes everything from the start of the
+publication (i.e. just after the computation of the new cluster state) until
+the publication has finished and the master node is ready to start processing
+the next state update. This includes the time measured by
+`context_construction_time`, `commit_time`, `completion_time` and
+`master_apply_time`.
+
+`context_construction_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent constructing a _publication context_ since
+the node started for publications that ultimately succeeded. This statistic
+includes the time spent computing the difference between the current and new
+cluster state preparing a serialized representation of this difference.
+
+`commit_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent waiting for a successful cluster state
+update to _commit_, which measures the time from the start of each publication
+until a majority of the master-eligible nodes have written the state to disk
+and confirmed the write to the elected master.
+
+`completion_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent waiting for a successful cluster state
+update to _complete_, which measures the time from the start of each
+publication until all the other nodes have notified the elected master that
+they have applied the cluster state.
+
+`master_apply_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent successfully applying cluster state updates
+on the elected master since the node started.
+
+`notification_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent notifying listeners of a successful cluster
+state update since the node started.
+
+========
+
+`failure`::
+(object)
+Contains statistics about cluster state update attempts that did not
+successfully change the cluster state, typically because a new master node was
+elected before completion.
++
+.Properties of `failure`
+[%collapsible]
+========
+`count`::
+(long)
+The number of cluster state update attempts that failed to change the cluster
+state since the node started.
+
+`computation_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent computing cluster state updates that were
+ultimately unsuccessful since the node started.
+
+`publication_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent publishing cluster state updates which
+ultimately failed, which includes everything from the start of the
+publication (i.e. just after the computation of the new cluster state) until
+the publication has finished and the master node is ready to start processing
+the next state update. This includes the time measured by
+`context_construction_time`, `commit_time`, `completion_time` and
+`master_apply_time`.
+
+`context_construction_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent constructing a _publication context_ since
+the node started for publications that ultimately failed. This statistic
+includes the time spent computing the difference between the current and new
+cluster state preparing a serialized representation of this difference.
+
+`commit_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent waiting for an unsuccessful cluster state
+update to _commit_, which measures the time from the start of each publication
+until a majority of the master-eligible nodes have written the state to disk
+and confirmed the write to the elected master.
+
+`completion_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent waiting for an unsuccessful cluster state
+update to _complete_, which measures the time from the start of each
+publication until all the other nodes have notified the elected master that
+they have applied the cluster state.
+
+`master_apply_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent unsuccessfully applying cluster state
+updates on the elected master since the node started.
+
+`notification_time`::
+(<<time-units,time value>>)
+The cumulative amount of time spent notifying listeners of a failed cluster
+state update since the node started.
+
+========
+=======
 ======
 
 [[cluster-nodes-stats-api-response-body-ingest]]

+ 96 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/30_discovery.yml

@@ -40,3 +40,99 @@
   - gte: { nodes.$master.discovery.published_cluster_states.incompatible_diffs: 0 }
   - gte: { nodes.$master.discovery.published_cluster_states.compatible_diffs: 0 }
   - is_false:  nodes.$master.roles
+
+---
+"Master timing stats":
+  - skip:
+      features: [arbitrary_key]
+      version: "- 7.99.99"
+      reason: "master timing stats added in 8.0.0"
+
+  - do:
+      nodes.info:
+        node_id: _master
+  - set:
+      nodes._arbitrary_key_: master
+
+  - do:
+      nodes.stats:
+        metric: [ discovery ]
+
+  - gte: { nodes.$master.discovery.cluster_state_update.unchanged.count: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.unchanged.computation_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.unchanged.notification_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.count: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.computation_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.publication_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.context_construction_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.commit_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.completion_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.master_apply_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.notification_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.count: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.computation_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.publication_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.context_construction_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.commit_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.completion_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.master_apply_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.notification_time_millis: 0 }
+
+  - is_false: nodes.$master.discovery.cluster_state_update.unchanged.computation_time
+  - is_false: nodes.$master.discovery.cluster_state_update.unchanged.notification_time
+  - is_false: nodes.$master.discovery.cluster_state_update.success.computation_time
+  - is_false: nodes.$master.discovery.cluster_state_update.success.publication_time
+  - is_false: nodes.$master.discovery.cluster_state_update.success.context_construction_time
+  - is_false: nodes.$master.discovery.cluster_state_update.success.commit_time
+  - is_false: nodes.$master.discovery.cluster_state_update.success.completion_time
+  - is_false: nodes.$master.discovery.cluster_state_update.success.master_apply_time
+  - is_false: nodes.$master.discovery.cluster_state_update.success.notification_time
+  - is_false: nodes.$master.discovery.cluster_state_update.failure.computation_time
+  - is_false: nodes.$master.discovery.cluster_state_update.failure.publication_time
+  - is_false: nodes.$master.discovery.cluster_state_update.failure.context_construction_time
+  - is_false: nodes.$master.discovery.cluster_state_update.failure.commit_time
+  - is_false: nodes.$master.discovery.cluster_state_update.failure.completion_time
+  - is_false: nodes.$master.discovery.cluster_state_update.failure.master_apply_time
+  - is_false: nodes.$master.discovery.cluster_state_update.failure.notification_time
+
+  - do:
+      nodes.stats:
+        metric: [ discovery ]
+        human: true
+
+  - gte: { nodes.$master.discovery.cluster_state_update.unchanged.count: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.unchanged.computation_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.unchanged.notification_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.count: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.computation_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.publication_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.context_construction_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.commit_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.completion_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.master_apply_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.success.notification_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.count: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.computation_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.publication_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.context_construction_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.commit_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.completion_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.master_apply_time_millis: 0 }
+  - gte: { nodes.$master.discovery.cluster_state_update.failure.notification_time_millis: 0 }
+
+  - is_true: nodes.$master.discovery.cluster_state_update.unchanged.computation_time
+  - is_true: nodes.$master.discovery.cluster_state_update.unchanged.notification_time
+  - is_true: nodes.$master.discovery.cluster_state_update.success.computation_time
+  - is_true: nodes.$master.discovery.cluster_state_update.success.publication_time
+  - is_true: nodes.$master.discovery.cluster_state_update.success.context_construction_time
+  - is_true: nodes.$master.discovery.cluster_state_update.success.commit_time
+  - is_true: nodes.$master.discovery.cluster_state_update.success.completion_time
+  - is_true: nodes.$master.discovery.cluster_state_update.success.master_apply_time
+  - is_true: nodes.$master.discovery.cluster_state_update.success.notification_time
+  - is_true: nodes.$master.discovery.cluster_state_update.failure.computation_time
+  - is_true: nodes.$master.discovery.cluster_state_update.failure.publication_time
+  - is_true: nodes.$master.discovery.cluster_state_update.failure.context_construction_time
+  - is_true: nodes.$master.discovery.cluster_state_update.failure.commit_time
+  - is_true: nodes.$master.discovery.cluster_state_update.failure.completion_time
+  - is_true: nodes.$master.discovery.cluster_state_update.failure.master_apply_time
+  - is_true: nodes.$master.discovery.cluster_state_update.failure.notification_time

+ 126 - 1
server/src/main/java/org/elasticsearch/cluster/ClusterStatePublicationEvent.java

@@ -14,14 +14,33 @@ package org.elasticsearch.cluster;
  */
 public class ClusterStatePublicationEvent {
 
+    /**
+     * Sentinel value so that we can assert each field is set once and only once on each successful event, and at most once on each failure.
+     */
+    private static final long NOT_SET = -1L;
+
     private final String summary;
     private final ClusterState oldState;
     private final ClusterState newState;
+    private final long computationTimeMillis;
+    private final long publicationStartTimeMillis;
+    private volatile long publicationContextConstructionElapsedMillis = NOT_SET;
+    private volatile long publicationCommitElapsedMillis = NOT_SET;
+    private volatile long publicationCompletionElapsedMillis = NOT_SET;
+    private volatile long masterApplyElapsedMillis = NOT_SET;
 
-    public ClusterStatePublicationEvent(String summary, ClusterState oldState, ClusterState newState) {
+    public ClusterStatePublicationEvent(
+        String summary,
+        ClusterState oldState,
+        ClusterState newState,
+        long computationTimeMillis,
+        long publicationStartTimeMillis
+    ) {
         this.summary = summary;
         this.oldState = oldState;
         this.newState = newState;
+        this.computationTimeMillis = computationTimeMillis;
+        this.publicationStartTimeMillis = publicationStartTimeMillis;
     }
 
     public String getSummary() {
@@ -35,4 +54,110 @@ public class ClusterStatePublicationEvent {
     public ClusterState getNewState() {
         return newState;
     }
+
+    public long getComputationTimeMillis() {
+        return computationTimeMillis;
+    }
+
+    public long getPublicationStartTimeMillis() {
+        return publicationStartTimeMillis;
+    }
+
+    public void setPublicationContextConstructionElapsedMillis(long millis) {
+        assert millis >= 0;
+        assert publicationContextConstructionElapsedMillis == NOT_SET;
+        publicationContextConstructionElapsedMillis = millis;
+    }
+
+    public void setPublicationCommitElapsedMillis(long millis) {
+        assert millis >= 0;
+        assert publicationCommitElapsedMillis == NOT_SET;
+        publicationCommitElapsedMillis = millis;
+    }
+
+    public void setPublicationCompletionElapsedMillis(long millis) {
+        assert millis >= 0;
+        assert publicationCompletionElapsedMillis == NOT_SET;
+        publicationCompletionElapsedMillis = millis;
+    }
+
+    public void setMasterApplyElapsedMillis(long millis) {
+        assert millis >= 0;
+        assert masterApplyElapsedMillis == NOT_SET;
+        masterApplyElapsedMillis = millis;
+    }
+
+    /**
+     * @return how long in milliseconds it took to construct the publication context, which includes computing a cluster state diff and
+     *         serializing the cluster states for future transmission.
+     */
+    public long getPublicationContextConstructionElapsedMillis() {
+        assert publicationContextConstructionElapsedMillis != NOT_SET;
+        return publicationContextConstructionElapsedMillis;
+    }
+
+    /**
+     * @return how long in milliseconds it took to commit the publication, i.e. the elapsed time from the publication start until the master
+     *         receives publish responses from a majority of master nodes indicating that the state has been received and persisted there.
+     */
+    public long getPublicationCommitElapsedMillis() {
+        assert publicationCommitElapsedMillis != NOT_SET;
+        return publicationCommitElapsedMillis;
+    }
+
+    /**
+     * @return how long in milliseconds it took to complete the publication, i.e. the elapsed time from the publication start until all
+     *         nodes except the master have applied the cluster state.
+     */
+    public long getPublicationCompletionElapsedMillis() {
+        assert publicationCompletionElapsedMillis != NOT_SET;
+        return publicationCompletionElapsedMillis;
+    }
+
+    /**
+     * @return how long in milliseconds it took for the master to apply the cluster state, which happens after publication completion.
+     */
+    public long getMasterApplyElapsedMillis() {
+        assert masterApplyElapsedMillis != NOT_SET;
+        return masterApplyElapsedMillis;
+    }
+
+    /**
+     * @return how long in milliseconds it took to construct the publication context, which includes computing a cluster state diff and
+     *         serializing the cluster states for future transmission, or zero if not set.
+     */
+    public long maybeGetPublicationContextConstructionElapsedMillis() {
+        return ifSet(publicationContextConstructionElapsedMillis);
+    }
+
+    /**
+     * @return how long in milliseconds it took to commit the publication, i.e. the elapsed time from the publication start until the master
+     *         receives publish responses from a majority of master nodes indicating that the state has been received and persisted there,
+     *         or zero if not set.
+     */
+    public long maybeGetPublicationCommitElapsedMillis() {
+        return ifSet(publicationCommitElapsedMillis);
+    }
+
+    /**
+     * @return how long in milliseconds it took to complete the publication, i.e. the elapsed time from the publication start until all
+     *         nodes except the master have applied the cluster state, or zero if not set.
+     */
+    public long maybeGetPublicationCompletionElapsedMillis() {
+        return ifSet(publicationCompletionElapsedMillis);
+    }
+
+    /**
+     * @return how long in milliseconds it took for the master to apply the cluster state, which happens after publication completion, or
+     *         zero if not set.
+     */
+    public long maybeGetMasterApplyElapsedMillis() {
+        return ifSet(masterApplyElapsedMillis);
+    }
+
+    private static long ifSet(long millis) {
+        assert millis == NOT_SET || millis >= 0;
+        return millis == NOT_SET ? 0 : millis;
+    }
+
 }

+ 31 - 6
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -683,7 +683,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
 
     @Override
     public DiscoveryStats stats() {
-        return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats());
+        return new DiscoveryStats(
+            new PendingClusterStateStats(0, 0, 0),
+            publicationHandler.stats(),
+            getLocalNode().isMasterNode() ? masterService.getClusterStateUpdateStats() : null
+        );
     }
 
     @Override
@@ -1062,12 +1066,20 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                 assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())) :
                     getLocalNode() + " should be in published " + clusterState;
 
+                final long publicationContextConstructionStartMillis = transportService.getThreadPool().rawRelativeTimeInMillis();
                 final PublicationTransportHandler.PublicationContext publicationContext =
                     publicationHandler.newPublicationContext(clusterStatePublicationEvent);
+                clusterStatePublicationEvent.setPublicationContextConstructionElapsedMillis(
+                    transportService.getThreadPool().rawRelativeTimeInMillis() - publicationContextConstructionStartMillis);
 
                 final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
-                final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, publicationContext,
-                    new ListenableFuture<>(), ackListener, publishListener);
+                final CoordinatorPublication publication = new CoordinatorPublication(
+                    clusterStatePublicationEvent,
+                    publishRequest,
+                    publicationContext,
+                    new ListenableFuture<>(),
+                    ackListener,
+                    publishListener);
                 currentPublication = Optional.of(publication);
 
                 final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
@@ -1245,6 +1257,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
 
     class CoordinatorPublication extends Publication {
 
+        private final ClusterStatePublicationEvent clusterStatePublicationEvent;
         private final PublishRequest publishRequest;
         private final ListenableFuture<Void> localNodeAckEvent;
         private final AckListener ackListener;
@@ -1260,12 +1273,19 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
         private final List<Join> receivedJoins = new ArrayList<>();
         private boolean receivedJoinsProcessed;
 
-        CoordinatorPublication(PublishRequest publishRequest, PublicationTransportHandler.PublicationContext publicationContext,
-                               ListenableFuture<Void> localNodeAckEvent, AckListener ackListener, ActionListener<Void> publishListener) {
+        CoordinatorPublication(
+            ClusterStatePublicationEvent clusterStatePublicationEvent,
+            PublishRequest publishRequest,
+            PublicationTransportHandler.PublicationContext publicationContext,
+            ListenableFuture<Void> localNodeAckEvent,
+            AckListener ackListener,
+            ActionListener<Void> publishListener
+        ) {
             super(publishRequest,
                 new AckListener() {
                     @Override
                     public void onCommit(TimeValue commitTime) {
+                        clusterStatePublicationEvent.setPublicationCommitElapsedMillis(commitTime.millis());
                         ackListener.onCommit(commitTime);
                     }
 
@@ -1288,7 +1308,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                         }
                     }
                 },
-                transportService.getThreadPool()::relativeTimeInMillis);
+                transportService.getThreadPool()::rawRelativeTimeInMillis);
+            this.clusterStatePublicationEvent = clusterStatePublicationEvent;
             this.publishRequest = publishRequest;
             this.publicationContext = publicationContext;
             this.localNodeAckEvent = localNodeAckEvent;
@@ -1345,6 +1366,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
         @Override
         protected void onCompletion(boolean committed) {
             assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
+            final long completionTimeMillis = transportService.getThreadPool().rawRelativeTimeInMillis();
+            clusterStatePublicationEvent.setPublicationCompletionElapsedMillis(completionTimeMillis - getStartTime());
 
             localNodeAckEvent.addListener(new ActionListener<Void>() {
                 @Override
@@ -1370,6 +1393,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
 
                             @Override
                             public void onSuccess(String source) {
+                                clusterStatePublicationEvent.setMasterApplyElapsedMillis(
+                                    transportService.getThreadPool().rawRelativeTimeInMillis() - completionTimeMillis);
                                 synchronized (mutex) {
                                     assert currentPublication.get() == CoordinatorPublication.this;
                                     currentPublication = Optional.empty();

+ 4 - 0
server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java

@@ -163,6 +163,10 @@ public abstract class Publication {
         }
     }
 
+    protected final long getStartTime() {
+        return startTime;
+    }
+
     protected abstract void onCompletion(boolean committed);
 
     protected abstract boolean isPublishQuorum(CoordinationState.VoteCollection votes);

+ 329 - 0
server/src/main/java/org/elasticsearch/cluster/service/ClusterStateUpdateStats.java

@@ -0,0 +1,329 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.service;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContentFragment;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.core.TimeValue;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Various statistics (timing information etc) about cluster state updates coordinated by this node.
+ */
+public class ClusterStateUpdateStats implements Writeable, ToXContentFragment {
+
+    private final long unchangedTaskCount;
+    private final long publicationSuccessCount;
+    private final long publicationFailureCount;
+
+    private final long unchangedComputationElapsedMillis;
+    private final long unchangedNotificationElapsedMillis;
+    private final long successfulComputationElapsedMillis;
+    private final long successfulPublicationElapsedMillis;
+    private final long successfulContextConstructionElapsedMillis;
+    private final long successfulCommitElapsedMillis;
+    private final long successfulCompletionElapsedMillis;
+    private final long successfulMasterApplyElapsedMillis;
+    private final long successfulNotificationElapsedMillis;
+
+    private final long failedComputationElapsedMillis;
+    private final long failedPublicationElapsedMillis;
+    private final long failedContextConstructionElapsedMillis;
+    private final long failedCommitElapsedMillis;
+    private final long failedCompletionElapsedMillis;
+    private final long failedMasterApplyElapsedMillis;
+    private final long failedNotificationElapsedMillis;
+
+    public ClusterStateUpdateStats(
+        long unchangedTaskCount,
+        long publicationSuccessCount,
+        long publicationFailureCount,
+        long unchangedComputationElapsedMillis,
+        long unchangedNotificationElapsedMillis,
+        long successfulComputationElapsedMillis,
+        long successfulPublicationElapsedMillis,
+        long successfulContextConstructionElapsedMillis,
+        long successfulCommitElapsedMillis,
+        long successfulCompletionElapsedMillis,
+        long successfulMasterApplyElapsedMillis,
+        long successfulNotificationElapsedMillis,
+        long failedComputationElapsedMillis,
+        long failedPublicationElapsedMillis,
+        long failedContextConstructionElapsedMillis,
+        long failedCommitElapsedMillis,
+        long failedCompletionElapsedMillis,
+        long failedMasterApplyElapsedMillis,
+        long failedNotificationElapsedMillis
+    ) {
+        this.unchangedTaskCount = nonNegative(unchangedTaskCount);
+        this.publicationSuccessCount = nonNegative(publicationSuccessCount);
+        this.publicationFailureCount = nonNegative(publicationFailureCount);
+        this.unchangedComputationElapsedMillis = nonNegative(unchangedComputationElapsedMillis);
+        this.unchangedNotificationElapsedMillis = nonNegative(unchangedNotificationElapsedMillis);
+        this.successfulComputationElapsedMillis = nonNegative(successfulComputationElapsedMillis);
+        this.successfulPublicationElapsedMillis = nonNegative(successfulPublicationElapsedMillis);
+        this.successfulContextConstructionElapsedMillis = nonNegative(successfulContextConstructionElapsedMillis);
+        this.successfulCommitElapsedMillis = nonNegative(successfulCommitElapsedMillis);
+        this.successfulCompletionElapsedMillis = nonNegative(successfulCompletionElapsedMillis);
+        this.successfulMasterApplyElapsedMillis = nonNegative(successfulMasterApplyElapsedMillis);
+        this.successfulNotificationElapsedMillis = nonNegative(successfulNotificationElapsedMillis);
+        this.failedComputationElapsedMillis = nonNegative(failedComputationElapsedMillis);
+        this.failedPublicationElapsedMillis = nonNegative(failedPublicationElapsedMillis);
+        this.failedContextConstructionElapsedMillis = nonNegative(failedContextConstructionElapsedMillis);
+        this.failedCommitElapsedMillis = nonNegative(failedCommitElapsedMillis);
+        this.failedCompletionElapsedMillis = nonNegative(failedCompletionElapsedMillis);
+        this.failedMasterApplyElapsedMillis = nonNegative(failedMasterApplyElapsedMillis);
+        this.failedNotificationElapsedMillis = nonNegative(failedNotificationElapsedMillis);
+    }
+
+    private static long nonNegative(long v) {
+        assert v >= 0 : v;
+        return v;
+    }
+
+    public ClusterStateUpdateStats(StreamInput in) throws IOException {
+        this.unchangedTaskCount = in.readVLong();
+        this.publicationSuccessCount = in.readVLong();
+        this.publicationFailureCount = in.readVLong();
+        this.unchangedComputationElapsedMillis = in.readVLong();
+        this.unchangedNotificationElapsedMillis = in.readVLong();
+        this.successfulComputationElapsedMillis = in.readVLong();
+        this.successfulPublicationElapsedMillis = in.readVLong();
+        this.successfulContextConstructionElapsedMillis = in.readVLong();
+        this.successfulCommitElapsedMillis = in.readVLong();
+        this.successfulCompletionElapsedMillis = in.readVLong();
+        this.successfulMasterApplyElapsedMillis = in.readVLong();
+        this.successfulNotificationElapsedMillis = in.readVLong();
+        this.failedComputationElapsedMillis = in.readVLong();
+        this.failedPublicationElapsedMillis = in.readVLong();
+        this.failedContextConstructionElapsedMillis = in.readVLong();
+        this.failedCommitElapsedMillis = in.readVLong();
+        this.failedCompletionElapsedMillis = in.readVLong();
+        this.failedMasterApplyElapsedMillis = in.readVLong();
+        this.failedNotificationElapsedMillis = in.readVLong();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        assert out.getVersion().onOrAfter(Version.V_8_0_0) : out.getVersion();
+        out.writeVLong(unchangedTaskCount);
+        out.writeVLong(publicationSuccessCount);
+        out.writeVLong(publicationFailureCount);
+        out.writeVLong(unchangedComputationElapsedMillis);
+        out.writeVLong(unchangedNotificationElapsedMillis);
+        out.writeVLong(successfulComputationElapsedMillis);
+        out.writeVLong(successfulPublicationElapsedMillis);
+        out.writeVLong(successfulContextConstructionElapsedMillis);
+        out.writeVLong(successfulCommitElapsedMillis);
+        out.writeVLong(successfulCompletionElapsedMillis);
+        out.writeVLong(successfulMasterApplyElapsedMillis);
+        out.writeVLong(successfulNotificationElapsedMillis);
+        out.writeVLong(failedComputationElapsedMillis);
+        out.writeVLong(failedPublicationElapsedMillis);
+        out.writeVLong(failedContextConstructionElapsedMillis);
+        out.writeVLong(failedCommitElapsedMillis);
+        out.writeVLong(failedCompletionElapsedMillis);
+        out.writeVLong(failedMasterApplyElapsedMillis);
+        out.writeVLong(failedNotificationElapsedMillis);
+    }
+
+    public static ClusterStateUpdateStats EMPTY = new ClusterStateUpdateStats(
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L,
+        0L);
+
+
+    public long getUnchangedTaskCount() {
+        return unchangedTaskCount;
+    }
+
+    public long getPublicationSuccessCount() {
+        return publicationSuccessCount;
+    }
+
+    public long getPublicationFailureCount() {
+        return publicationFailureCount;
+    }
+
+    public long getUnchangedComputationElapsedMillis() {
+        return unchangedComputationElapsedMillis;
+    }
+
+    public long getUnchangedNotificationElapsedMillis() {
+        return unchangedNotificationElapsedMillis;
+    }
+
+    public long getSuccessfulComputationElapsedMillis() {
+        return successfulComputationElapsedMillis;
+    }
+
+    public long getSuccessfulPublicationElapsedMillis() {
+        return successfulPublicationElapsedMillis;
+    }
+
+    public long getSuccessfulContextConstructionElapsedMillis() {
+        return successfulContextConstructionElapsedMillis;
+    }
+
+    public long getSuccessfulCommitElapsedMillis() {
+        return successfulCommitElapsedMillis;
+    }
+
+    public long getSuccessfulCompletionElapsedMillis() {
+        return successfulCompletionElapsedMillis;
+    }
+
+    public long getSuccessfulMasterApplyElapsedMillis() {
+        return successfulMasterApplyElapsedMillis;
+    }
+
+    public long getSuccessfulNotificationElapsedMillis() {
+        return successfulNotificationElapsedMillis;
+    }
+
+    public long getFailedComputationElapsedMillis() {
+        return failedComputationElapsedMillis;
+    }
+
+    public long getFailedPublicationElapsedMillis() {
+        return failedPublicationElapsedMillis;
+    }
+
+    public long getFailedContextConstructionElapsedMillis() {
+        return failedContextConstructionElapsedMillis;
+    }
+
+    public long getFailedCommitElapsedMillis() {
+        return failedCommitElapsedMillis;
+    }
+
+    public long getFailedCompletionElapsedMillis() {
+        return failedCompletionElapsedMillis;
+    }
+
+    public long getFailedMasterApplyElapsedMillis() {
+        return failedMasterApplyElapsedMillis;
+    }
+
+    public long getFailedNotificationElapsedMillis() {
+        return failedNotificationElapsedMillis;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject("cluster_state_update");
+
+        builder.startObject("unchanged");
+        builder.field("count", unchangedTaskCount);
+        msField(builder, "computation", unchangedComputationElapsedMillis);
+        msField(builder, "notification", unchangedNotificationElapsedMillis);
+        builder.endObject();
+
+        builder.startObject("success");
+        builder.field("count", publicationSuccessCount);
+        msField(builder, "computation", successfulComputationElapsedMillis);
+        msField(builder, "publication", successfulPublicationElapsedMillis);
+        msField(builder, "context_construction", successfulContextConstructionElapsedMillis);
+        msField(builder, "commit", successfulCommitElapsedMillis);
+        msField(builder, "completion", successfulCompletionElapsedMillis);
+        msField(builder, "master_apply", successfulMasterApplyElapsedMillis);
+        msField(builder, "notification", successfulNotificationElapsedMillis);
+        builder.endObject();
+
+        builder.startObject("failure");
+        builder.field("count", publicationFailureCount);
+        msField(builder, "computation", failedComputationElapsedMillis);
+        msField(builder, "publication", failedPublicationElapsedMillis);
+        msField(builder, "context_construction", failedContextConstructionElapsedMillis);
+        msField(builder, "commit", failedCommitElapsedMillis);
+        msField(builder, "completion", failedCompletionElapsedMillis);
+        msField(builder, "master_apply", failedMasterApplyElapsedMillis);
+        msField(builder, "notification", failedNotificationElapsedMillis);
+        builder.endObject();
+
+        builder.endObject();
+        return builder;
+    }
+
+    private static void msField(XContentBuilder builder, String name, long millis) throws IOException {
+        builder.humanReadableField(name + "_time_millis", name + "_time", TimeValue.timeValueMillis(millis));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ClusterStateUpdateStats that = (ClusterStateUpdateStats) o;
+        return unchangedTaskCount == that.unchangedTaskCount
+            && publicationSuccessCount == that.publicationSuccessCount
+            && publicationFailureCount == that.publicationFailureCount
+            && unchangedComputationElapsedMillis == that.unchangedComputationElapsedMillis
+            && unchangedNotificationElapsedMillis == that.unchangedNotificationElapsedMillis
+            && successfulComputationElapsedMillis == that.successfulComputationElapsedMillis
+            && successfulPublicationElapsedMillis == that.successfulPublicationElapsedMillis
+            && successfulContextConstructionElapsedMillis == that.successfulContextConstructionElapsedMillis
+            && successfulCommitElapsedMillis == that.successfulCommitElapsedMillis
+            && successfulCompletionElapsedMillis == that.successfulCompletionElapsedMillis
+            && successfulMasterApplyElapsedMillis == that.successfulMasterApplyElapsedMillis
+            && successfulNotificationElapsedMillis == that.successfulNotificationElapsedMillis
+            && failedComputationElapsedMillis == that.failedComputationElapsedMillis
+            && failedPublicationElapsedMillis == that.failedPublicationElapsedMillis
+            && failedContextConstructionElapsedMillis == that.failedContextConstructionElapsedMillis
+            && failedCommitElapsedMillis == that.failedCommitElapsedMillis
+            && failedCompletionElapsedMillis == that.failedCompletionElapsedMillis
+            && failedMasterApplyElapsedMillis == that.failedMasterApplyElapsedMillis
+            && failedNotificationElapsedMillis == that.failedNotificationElapsedMillis;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            unchangedTaskCount,
+            publicationSuccessCount,
+            publicationFailureCount,
+            unchangedComputationElapsedMillis,
+            unchangedNotificationElapsedMillis,
+            successfulComputationElapsedMillis,
+            successfulPublicationElapsedMillis,
+            successfulContextConstructionElapsedMillis,
+            successfulCommitElapsedMillis,
+            successfulCompletionElapsedMillis,
+            successfulMasterApplyElapsedMillis,
+            successfulNotificationElapsedMillis,
+            failedComputationElapsedMillis,
+            failedPublicationElapsedMillis,
+            failedContextConstructionElapsedMillis,
+            failedCommitElapsedMillis,
+            failedCompletionElapsedMillis,
+            failedMasterApplyElapsedMillis,
+            failedNotificationElapsedMillis);
+    }
+}

+ 124 - 18
server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

@@ -14,9 +14,9 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.Assertions;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.AckedClusterStateTaskListener;
-import org.elasticsearch.cluster.ClusterStatePublicationEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState.Builder;
+import org.elasticsearch.cluster.ClusterStatePublicationEvent;
 import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
@@ -90,6 +90,8 @@ public class MasterService extends AbstractLifecycleComponent {
     private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
     private volatile Batcher taskBatcher;
 
+    private final ClusterStateUpdateStatsTracker clusterStateUpdateStatsTracker = new ClusterStateUpdateStatsTracker();
+
     public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
         this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
 
@@ -133,6 +135,10 @@ public class MasterService extends AbstractLifecycleComponent {
                 () -> threadPoolExecutor));
     }
 
+    public ClusterStateUpdateStats getClusterStateUpdateStats() {
+        return clusterStateUpdateStatsTracker.getStatistics();
+    }
+
     @SuppressWarnings("unchecked")
     class Batcher extends TaskBatcher {
 
@@ -214,17 +220,18 @@ public class MasterService extends AbstractLifecycleComponent {
             return;
         }
 
-        final long computationStartTime = threadPool.relativeTimeInMillis();
+        final long computationStartTime = threadPool.rawRelativeTimeInMillis();
         final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
         taskOutputs.notifyFailedTasks();
         final TimeValue computationTime = getTimeSince(computationStartTime);
         logExecutionTime(computationTime, "compute cluster state update", summary);
 
         if (taskOutputs.clusterStateUnchanged()) {
-            final long notificationStartTime = threadPool.relativeTimeInMillis();
+            final long notificationStartTime = threadPool.rawRelativeTimeInMillis();
             taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
             final TimeValue executionTime = getTimeSince(notificationStartTime);
             logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
+            clusterStateUpdateStatsTracker.onUnchangedClusterState(computationTime.millis(), executionTime.millis());
         } else {
             final ClusterState newClusterState = taskOutputs.newClusterState;
             if (logger.isTraceEnabled()) {
@@ -232,10 +239,15 @@ public class MasterService extends AbstractLifecycleComponent {
             } else {
                 logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
             }
-            final long publicationStartTime = threadPool.relativeTimeInMillis();
+            final long publicationStartTime = threadPool.rawRelativeTimeInMillis();
             try {
-                ClusterStatePublicationEvent clusterStatePublicationEvent =
-                    new ClusterStatePublicationEvent(summary, previousClusterState, newClusterState);
+                final ClusterStatePublicationEvent clusterStatePublicationEvent = new ClusterStatePublicationEvent(
+                    summary,
+                    previousClusterState,
+                    newClusterState,
+                    computationTime.millis(),
+                    publicationStartTime);
+
                 // new cluster state, notify all listeners
                 final DiscoveryNodes.Delta nodesDelta = newClusterState.nodes().delta(previousClusterState.nodes());
                 if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
@@ -247,7 +259,7 @@ public class MasterService extends AbstractLifecycleComponent {
                 }
 
                 logger.debug("publishing cluster state version [{}]", newClusterState.version());
-                publish(clusterStatePublicationEvent, taskOutputs, publicationStartTime);
+                publish(clusterStatePublicationEvent, taskOutputs);
             } catch (Exception e) {
                 handleException(summary, publicationStartTime, newClusterState, e);
             }
@@ -255,10 +267,10 @@ public class MasterService extends AbstractLifecycleComponent {
     }
 
     private TimeValue getTimeSince(long startTimeMillis) {
-        return TimeValue.timeValueMillis(Math.max(0, threadPool.relativeTimeInMillis() - startTimeMillis));
+        return TimeValue.timeValueMillis(Math.max(0, threadPool.rawRelativeTimeInMillis() - startTimeMillis));
     }
 
-    protected void publish(ClusterStatePublicationEvent clusterStatePublicationEvent, TaskOutputs taskOutputs, long startTimeMillis) {
+    protected void publish(ClusterStatePublicationEvent clusterStatePublicationEvent, TaskOutputs taskOutputs) {
         final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() {
             @Override
             protected boolean blockingAllowed() {
@@ -275,12 +287,12 @@ public class MasterService extends AbstractLifecycleComponent {
             FutureUtils.get(fut);
             onPublicationSuccess(clusterStatePublicationEvent, taskOutputs);
         } catch (Exception e) {
-            onPublicationFailed(clusterStatePublicationEvent, taskOutputs, startTimeMillis, e);
+            onPublicationFailed(clusterStatePublicationEvent, taskOutputs, e);
         }
     }
 
     void onPublicationSuccess(ClusterStatePublicationEvent clusterStatePublicationEvent, TaskOutputs taskOutputs) {
-        final long notificationStartTime = threadPool.relativeTimeInMillis();
+        final long notificationStartTime = threadPool.rawRelativeTimeInMillis();
         taskOutputs.processedDifferentClusterState(clusterStatePublicationEvent.getOldState(), clusterStatePublicationEvent.getNewState());
 
         try {
@@ -296,15 +308,15 @@ public class MasterService extends AbstractLifecycleComponent {
             "notify listeners on successful publication of cluster state (version: " + clusterStatePublicationEvent.getNewState().version()
                 + ", uuid: " + clusterStatePublicationEvent.getNewState().stateUUID() + ')',
             clusterStatePublicationEvent.getSummary());
+        clusterStateUpdateStatsTracker.onPublicationSuccess(
+            threadPool.rawRelativeTimeInMillis(),
+            clusterStatePublicationEvent,
+            executionTime.millis());
     }
 
-    void onPublicationFailed(
-        ClusterStatePublicationEvent clusterStatePublicationEvent,
-        TaskOutputs taskOutputs,
-        long startTimeMillis,
-        Exception exception
-    ) {
+    void onPublicationFailed(ClusterStatePublicationEvent clusterStatePublicationEvent, TaskOutputs taskOutputs, Exception exception) {
         if (exception instanceof FailedToCommitClusterStateException) {
+            final long notificationStartTime = threadPool.rawRelativeTimeInMillis();
             final long version = clusterStatePublicationEvent.getNewState().version();
             logger.warn(
                 () -> new ParameterizedMessage(
@@ -313,10 +325,17 @@ public class MasterService extends AbstractLifecycleComponent {
                     version),
                 exception);
             taskOutputs.publishingFailed((FailedToCommitClusterStateException) exception);
+            final long notificationMillis = threadPool.rawRelativeTimeInMillis() - notificationStartTime;
+            clusterStateUpdateStatsTracker.onPublicationFailure(
+                threadPool.rawRelativeTimeInMillis(),
+                clusterStatePublicationEvent,
+                notificationMillis);
         } else {
+            assert false : exception;
+            clusterStateUpdateStatsTracker.onPublicationFailure(threadPool.rawRelativeTimeInMillis(), clusterStatePublicationEvent, 0L);
             handleException(
                 clusterStatePublicationEvent.getSummary(),
-                startTimeMillis,
+                clusterStatePublicationEvent.getPublicationStartTimeMillis(),
                 clusterStatePublicationEvent.getNewState(),
                 exception);
         }
@@ -887,4 +906,91 @@ public class MasterService extends AbstractLifecycleComponent {
         }
     }
 
+    private static class ClusterStateUpdateStatsTracker {
+
+        private long unchangedTaskCount;
+        private long publicationSuccessCount;
+        private long publicationFailureCount;
+
+        private long unchangedComputationElapsedMillis;
+        private long unchangedNotificationElapsedMillis;
+
+        private long successfulComputationElapsedMillis;
+        private long successfulPublicationElapsedMillis;
+        private long successfulContextConstructionElapsedMillis;
+        private long successfulCommitElapsedMillis;
+        private long successfulCompletionElapsedMillis;
+        private long successfulMasterApplyElapsedMillis;
+        private long successfulNotificationElapsedMillis;
+
+        private long failedComputationElapsedMillis;
+        private long failedPublicationElapsedMillis;
+        private long failedContextConstructionElapsedMillis;
+        private long failedCommitElapsedMillis;
+        private long failedCompletionElapsedMillis;
+        private long failedMasterApplyElapsedMillis;
+        private long failedNotificationElapsedMillis;
+
+        synchronized void onUnchangedClusterState(long computationElapsedMillis, long notificationElapsedMillis) {
+            unchangedTaskCount += 1;
+            unchangedComputationElapsedMillis += computationElapsedMillis;
+            unchangedNotificationElapsedMillis += notificationElapsedMillis;
+        }
+
+        synchronized void onPublicationSuccess(
+            long currentTimeMillis,
+            ClusterStatePublicationEvent clusterStatePublicationEvent,
+            long notificationElapsedMillis
+        ) {
+            publicationSuccessCount += 1;
+            successfulComputationElapsedMillis += clusterStatePublicationEvent.getComputationTimeMillis();
+            successfulPublicationElapsedMillis += currentTimeMillis - clusterStatePublicationEvent.getPublicationStartTimeMillis();
+            successfulContextConstructionElapsedMillis += clusterStatePublicationEvent.getPublicationContextConstructionElapsedMillis();
+            successfulCommitElapsedMillis += clusterStatePublicationEvent.getPublicationCommitElapsedMillis();
+            successfulCompletionElapsedMillis += clusterStatePublicationEvent.getPublicationCompletionElapsedMillis();
+            successfulMasterApplyElapsedMillis += clusterStatePublicationEvent.getMasterApplyElapsedMillis();
+            successfulNotificationElapsedMillis += notificationElapsedMillis;
+        }
+
+        synchronized void onPublicationFailure(
+            long currentTimeMillis,
+            ClusterStatePublicationEvent clusterStatePublicationEvent,
+            long notificationMillis
+        ) {
+            publicationFailureCount += 1;
+            failedComputationElapsedMillis += clusterStatePublicationEvent.getComputationTimeMillis();
+            failedPublicationElapsedMillis += currentTimeMillis - clusterStatePublicationEvent.getPublicationStartTimeMillis();
+            failedContextConstructionElapsedMillis += clusterStatePublicationEvent.maybeGetPublicationContextConstructionElapsedMillis();
+            failedCommitElapsedMillis += clusterStatePublicationEvent.maybeGetPublicationCommitElapsedMillis();
+            failedCompletionElapsedMillis += clusterStatePublicationEvent.maybeGetPublicationCompletionElapsedMillis();
+            failedMasterApplyElapsedMillis += clusterStatePublicationEvent.maybeGetMasterApplyElapsedMillis();
+            failedNotificationElapsedMillis += notificationMillis;
+        }
+
+        synchronized ClusterStateUpdateStats getStatistics() {
+            return new ClusterStateUpdateStats(
+                unchangedTaskCount,
+                publicationSuccessCount,
+                publicationFailureCount,
+                unchangedComputationElapsedMillis,
+                unchangedNotificationElapsedMillis,
+                successfulComputationElapsedMillis,
+                successfulPublicationElapsedMillis,
+                successfulContextConstructionElapsedMillis,
+                successfulCommitElapsedMillis,
+                successfulCompletionElapsedMillis,
+                successfulMasterApplyElapsedMillis,
+                successfulNotificationElapsedMillis,
+                failedComputationElapsedMillis,
+                failedPublicationElapsedMillis,
+                failedContextConstructionElapsedMillis,
+                failedCommitElapsedMillis,
+                failedCompletionElapsedMillis,
+                failedMasterApplyElapsedMillis,
+                failedNotificationElapsedMillis
+            );
+        }
+    }
+
+
 }

+ 24 - 1
server/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java

@@ -8,6 +8,8 @@
 
 package org.elasticsearch.discovery;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.service.ClusterStateUpdateStats;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -22,21 +24,35 @@ public class DiscoveryStats implements Writeable, ToXContentFragment {
 
     private final PendingClusterStateStats queueStats;
     private final PublishClusterStateStats publishStats;
+    private final ClusterStateUpdateStats clusterStateUpdateStats;
 
-    public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats) {
+    public DiscoveryStats(
+        PendingClusterStateStats queueStats,
+        PublishClusterStateStats publishStats,
+        ClusterStateUpdateStats clusterStateUpdateStats
+    ) {
         this.queueStats = queueStats;
         this.publishStats = publishStats;
+        this.clusterStateUpdateStats = clusterStateUpdateStats;
     }
 
     public DiscoveryStats(StreamInput in) throws IOException {
         queueStats = in.readOptionalWriteable(PendingClusterStateStats::new);
         publishStats = in.readOptionalWriteable(PublishClusterStateStats::new);
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            clusterStateUpdateStats = in.readOptionalWriteable(ClusterStateUpdateStats::new);
+        } else {
+            clusterStateUpdateStats = null;
+        }
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeOptionalWriteable(queueStats);
         out.writeOptionalWriteable(publishStats);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeOptionalWriteable(clusterStateUpdateStats);
+        }
     }
 
     @Override
@@ -48,10 +64,17 @@ public class DiscoveryStats implements Writeable, ToXContentFragment {
         if (publishStats != null) {
             publishStats.toXContent(builder, params);
         }
+        if (clusterStateUpdateStats != null) {
+            clusterStateUpdateStats.toXContent(builder, params);
+        }
         builder.endObject();
         return builder;
     }
 
+    public ClusterStateUpdateStats getClusterStateUpdateStats() {
+        return clusterStateUpdateStats;
+    }
+
     static final class Fields {
         static final String DISCOVERY = "discovery";
     }

+ 14 - 0
server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -270,6 +270,20 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
         return cachedTimeThread.relativeTimeInNanos();
     }
 
+    /**
+     * Returns a value of milliseconds that may be used for relative time calculations. Similar to {@link #relativeTimeInMillis()} except
+     * that this method is more expensive: the return value is computed directly from {@link System#nanoTime} and is not cached. You should
+     * use {@link #relativeTimeInMillis()} unless the extra accuracy offered by this method is worth the costs.
+     *
+     * When computing a time interval by comparing relative times in milliseconds, you should make sure that both endpoints use cached
+     * values returned from {@link #relativeTimeInMillis()} or that they both use raw values returned from this method. It doesn't really
+     * make sense to compare a raw value to a cached value, even if in practice the result of such a comparison will be approximately
+     * sensible.
+     */
+    public long rawRelativeTimeInMillis() {
+        return TimeValue.nsecToMSec(System.nanoTime());
+    }
+
     /**
      * Returns the value of milliseconds since UNIX epoch.
      *

+ 109 - 5
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -11,6 +11,7 @@ package org.elasticsearch.action.admin.cluster.node.stats;
 import org.elasticsearch.cluster.coordination.PendingClusterStateStats;
 import org.elasticsearch.cluster.coordination.PublishClusterStateStats;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterStateUpdateStats;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.discovery.DiscoveryStats;
@@ -277,6 +278,87 @@ public class NodeStatsTests extends ESTestCase {
                         assertEquals(queueStats.getTotal(), deserializedDiscoveryStats.getQueueStats().getTotal());
                         assertEquals(queueStats.getPending(), deserializedDiscoveryStats.getQueueStats().getPending());
                     }
+
+                    final PublishClusterStateStats publishStats = discoveryStats.getPublishStats();
+                    if (publishStats == null) {
+                        assertNull(deserializedDiscoveryStats.getPublishStats());
+                    } else {
+                        final PublishClusterStateStats deserializedPublishStats = deserializedDiscoveryStats.getPublishStats();
+                        assertEquals(
+                            publishStats.getFullClusterStateReceivedCount(),
+                            deserializedPublishStats.getFullClusterStateReceivedCount());
+                        assertEquals(
+                            publishStats.getCompatibleClusterStateDiffReceivedCount(),
+                            deserializedPublishStats.getCompatibleClusterStateDiffReceivedCount());
+                        assertEquals(
+                            publishStats.getIncompatibleClusterStateDiffReceivedCount(),
+                            deserializedPublishStats.getIncompatibleClusterStateDiffReceivedCount());
+                    }
+
+                    final ClusterStateUpdateStats clusterStateUpdateStats = discoveryStats.getClusterStateUpdateStats();
+                    if (clusterStateUpdateStats == null) {
+                        assertNull(deserializedDiscoveryStats.getClusterStateUpdateStats());
+                    } else {
+                        final ClusterStateUpdateStats deserializedClusterStateUpdateStats
+                            = deserializedDiscoveryStats.getClusterStateUpdateStats();
+                        assertEquals(
+                            clusterStateUpdateStats.getUnchangedTaskCount(),
+                            deserializedClusterStateUpdateStats.getUnchangedTaskCount());
+                        assertEquals(
+                            clusterStateUpdateStats.getPublicationSuccessCount(),
+                            deserializedClusterStateUpdateStats.getPublicationSuccessCount());
+                        assertEquals(
+                            clusterStateUpdateStats.getPublicationFailureCount(),
+                            deserializedClusterStateUpdateStats.getPublicationFailureCount());
+                        assertEquals(
+                            clusterStateUpdateStats.getUnchangedComputationElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getUnchangedComputationElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getUnchangedNotificationElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getUnchangedNotificationElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getSuccessfulComputationElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getSuccessfulComputationElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getSuccessfulPublicationElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getSuccessfulPublicationElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getSuccessfulContextConstructionElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getSuccessfulContextConstructionElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getSuccessfulCommitElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getSuccessfulCommitElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getSuccessfulCompletionElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getSuccessfulCompletionElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getSuccessfulMasterApplyElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getSuccessfulMasterApplyElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getSuccessfulNotificationElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getSuccessfulNotificationElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getFailedComputationElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getFailedComputationElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getFailedPublicationElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getFailedPublicationElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getFailedContextConstructionElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getFailedContextConstructionElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getFailedCommitElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getFailedCommitElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getFailedCompletionElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getFailedCompletionElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getFailedMasterApplyElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getFailedMasterApplyElapsedMillis());
+                        assertEquals(
+                            clusterStateUpdateStats.getFailedNotificationElapsedMillis(),
+                            deserializedClusterStateUpdateStats.getFailedNotificationElapsedMillis());
+                    }
                 }
                 IngestStats ingestStats = nodeStats.getIngestStats();
                 IngestStats deserializedIngestStats = deserializedNodeStats.getIngestStats();
@@ -483,14 +565,36 @@ public class NodeStatsTests extends ESTestCase {
         }
         DiscoveryStats discoveryStats = frequently()
             ? new DiscoveryStats(
-                randomBoolean()
+            randomBoolean()
                 ? new PendingClusterStateStats(randomInt(), randomInt(), randomInt())
                 : null,
-                randomBoolean()
+            randomBoolean()
                 ? new PublishClusterStateStats(
-                    randomNonNegativeLong(),
-                    randomNonNegativeLong(),
-                    randomNonNegativeLong())
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong())
+                : null,
+            randomBoolean()
+                ? new ClusterStateUpdateStats(
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong(),
+                randomNonNegativeLong())
                 : null)
             : null;
         IngestStats ingestStats = null;

+ 245 - 1
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

@@ -14,7 +14,9 @@ import org.apache.logging.log4j.core.LogEvent;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.AbstractDiffable;
+import org.elasticsearch.cluster.AbstractNamedDiffable;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.ClusterNode;
 import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
@@ -23,15 +25,19 @@ import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.ClusterStateUpdateStats;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings.Builder;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.discovery.DiscoveryModule;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.monitor.NodeHealthService;
@@ -78,6 +84,7 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.hamcrest.Matchers.startsWith;
@@ -961,6 +968,243 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
         }
     }
 
+    private static class TimeAdvancer {
+        private final DeterministicTaskQueue deterministicTaskQueue;
+        private long elapsedTime;
+
+        TimeAdvancer(DeterministicTaskQueue deterministicTaskQueue) {
+            this.deterministicTaskQueue = deterministicTaskQueue;
+        }
+
+        void advanceTime() {
+            final long startTime = deterministicTaskQueue.getCurrentTimeMillis();
+            if (deterministicTaskQueue.hasDeferredTasks() == false) {
+                deterministicTaskQueue.scheduleAt(startTime + between(1000, 2000), new Runnable() {
+                    @Override
+                    public void run() {
+                    }
+
+                    @Override
+                    public String toString() {
+                        return "no-op task to advance time";
+                    }
+                });
+            }
+            deterministicTaskQueue.advanceTime();
+            elapsedTime += deterministicTaskQueue.getCurrentTimeMillis() - startTime;
+        }
+
+        long getElapsedTime() {
+            return elapsedTime;
+        }
+    }
+
+    public void testMasterStatsOnNoOpUpdate() {
+        try (Cluster cluster = new Cluster(randomIntBetween(1, 5), false, Settings.EMPTY)) {
+            cluster.runRandomly();
+            cluster.stabilise();
+
+            for (ClusterNode clusterNode : cluster.getAllNodesExcept()) {
+                assertThat(
+                    clusterNode.coordinator.stats().getClusterStateUpdateStats() != null,
+                    equalTo(clusterNode.getLocalNode().isMasterNode()));
+            }
+
+            final ClusterNode leader = cluster.getAnyLeader();
+            final ClusterStateUpdateStats stats0 = leader.coordinator.stats().getClusterStateUpdateStats();
+
+            final TimeAdvancer computeAdvancer = new TimeAdvancer(cluster.deterministicTaskQueue);
+            final TimeAdvancer notifyAdvancer = new TimeAdvancer(cluster.deterministicTaskQueue);
+            leader.submitUpdateTask("unchanged", cs -> {
+                computeAdvancer.advanceTime();
+                return cs;
+            }, new ClusterStateTaskListener() {
+                @Override
+                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                    notifyAdvancer.advanceTime();
+                }
+
+                @Override
+                public void onFailure(String source, Exception e) {
+                    assert false : e;
+                }
+            });
+
+            cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
+
+            final ClusterStateUpdateStats stats1 = leader.coordinator.stats().getClusterStateUpdateStats();
+
+            final String description = Strings.toString(stats1) + " vs " + Strings.toString(stats0);
+            assertThat(description, stats1.getUnchangedTaskCount() - stats0.getUnchangedTaskCount(), greaterThanOrEqualTo(1L));
+            assertThat(
+                description,
+                stats1.getUnchangedComputationElapsedMillis() - stats0.getUnchangedComputationElapsedMillis(),
+                greaterThanOrEqualTo(computeAdvancer.getElapsedTime()));
+            assertThat(
+                description,
+                stats1.getUnchangedNotificationElapsedMillis() - stats0.getUnchangedNotificationElapsedMillis(),
+                greaterThanOrEqualTo(notifyAdvancer.getElapsedTime()));
+        }
+    }
+
+    public void testMasterStatsOnSuccessfulUpdate() {
+
+        final String customName = "delayed";
+
+        class DelayedCustom extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
+            @Nullable
+            private final TimeAdvancer timeAdvancer;
+
+            DelayedCustom(TimeAdvancer timeAdvancer) {
+                super();
+                this.timeAdvancer = timeAdvancer;
+            }
+
+            @Override
+            public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+                builder.startObject();
+                builder.endObject();
+                return builder;
+            }
+
+            @Override
+            public String getWriteableName() {
+                return customName;
+            }
+
+            @Override
+            public Version getMinimalSupportedVersion() {
+                return Version.CURRENT;
+            }
+
+            @Override
+            public void writeTo(StreamOutput out) throws IOException {
+                if (timeAdvancer != null) {
+                    timeAdvancer.advanceTime();
+                }
+            }
+        }
+
+        try (Cluster cluster = new Cluster(randomIntBetween(1, 5)) {
+            @Override
+            protected List<NamedWriteableRegistry.Entry> extraNamedWriteables() {
+                return List.of(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, customName, in -> new DelayedCustom(null)));
+            }
+        }) {
+            cluster.runRandomly();
+            cluster.stabilise();
+
+            final ClusterNode leader = cluster.getAnyLeader();
+            final ClusterStateUpdateStats stats0 = leader.coordinator.stats().getClusterStateUpdateStats();
+
+            final TimeAdvancer computeAdvancer = new TimeAdvancer(cluster.deterministicTaskQueue);
+            final TimeAdvancer notifyAdvancer = new TimeAdvancer(cluster.deterministicTaskQueue);
+            final TimeAdvancer contextAdvancer = new TimeAdvancer(cluster.deterministicTaskQueue);
+            leader.submitUpdateTask("update", cs -> {
+                computeAdvancer.advanceTime();
+                return ClusterState.builder(cs)
+                    .putCustom(customName, new DelayedCustom(contextAdvancer))
+                    .build();
+            }, new ClusterStateTaskListener() {
+                @Override
+                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                    notifyAdvancer.advanceTime();
+                }
+
+                @Override
+                public void onFailure(String source, Exception e) {
+                    assert false : e;
+                }
+            });
+
+            cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
+
+            final ClusterStateUpdateStats stats1 = leader.coordinator.stats().getClusterStateUpdateStats();
+
+            final String description = Strings.toString(stats1) + " vs " + Strings.toString(stats0);
+            assertThat(description, stats1.getPublicationSuccessCount() - stats0.getPublicationSuccessCount(), greaterThanOrEqualTo(1L));
+            assertThat(
+                description,
+                stats1.getSuccessfulComputationElapsedMillis() - stats0.getSuccessfulComputationElapsedMillis(),
+                greaterThanOrEqualTo(computeAdvancer.getElapsedTime()));
+            assertThat(
+                description,
+                stats1.getSuccessfulNotificationElapsedMillis() - stats0.getSuccessfulNotificationElapsedMillis(),
+                greaterThanOrEqualTo(notifyAdvancer.getElapsedTime()));
+            assertThat(
+                description,
+                stats1.getSuccessfulPublicationElapsedMillis() - stats0.getSuccessfulPublicationElapsedMillis(),
+                greaterThanOrEqualTo(notifyAdvancer.getElapsedTime()));
+            assertThat(
+                description,
+                stats1.getSuccessfulContextConstructionElapsedMillis() - stats0.getSuccessfulContextConstructionElapsedMillis(),
+                greaterThanOrEqualTo(contextAdvancer.getElapsedTime()));
+
+            // this is atomic up to some scheduling delay
+            assertThat(
+                description,
+                stats1.getSuccessfulMasterApplyElapsedMillis() - stats0.getSuccessfulMasterApplyElapsedMillis(),
+                lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
+        }
+    }
+
+    public void testMasterStatsOnFailedUpdate() {
+        try (Cluster cluster = new Cluster(randomIntBetween(3, 5))) {
+            cluster.runRandomly();
+            cluster.stabilise();
+
+            final ClusterNode leader = cluster.getAnyLeader();
+            final ClusterStateUpdateStats stats0 = leader.coordinator.stats().getClusterStateUpdateStats();
+
+            final TimeAdvancer computeAdvancer = new TimeAdvancer(cluster.deterministicTaskQueue);
+            final TimeAdvancer notifyAdvancer = new TimeAdvancer(cluster.deterministicTaskQueue);
+            leader.submitUpdateTask("update", cs -> {
+                computeAdvancer.advanceTime();
+                return ClusterState.builder(cs).build();
+            }, new ClusterStateTaskListener() {
+                @Override
+                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                    fail("shouldn't have processed cluster state");
+                }
+
+                @Override
+                public void onFailure(String source, Exception e) {
+                    notifyAdvancer.advanceTime();
+                }
+            });
+
+            leader.blackhole();
+            cluster.stabilise(DEFAULT_STABILISATION_TIME);
+
+            final ClusterStateUpdateStats stats1 = leader.coordinator.stats().getClusterStateUpdateStats();
+
+            final String description = Strings.toString(stats1) + " vs " + Strings.toString(stats0);
+            assertThat(description, stats1.getPublicationFailureCount() - stats0.getPublicationFailureCount(), greaterThanOrEqualTo(1L));
+            assertThat(
+                description,
+                stats1.getFailedComputationElapsedMillis() - stats0.getFailedComputationElapsedMillis(),
+                greaterThanOrEqualTo(computeAdvancer.getElapsedTime()));
+            assertThat(
+                description,
+                stats1.getFailedNotificationElapsedMillis() - stats0.getFailedNotificationElapsedMillis(),
+                greaterThanOrEqualTo(notifyAdvancer.getElapsedTime()));
+            assertThat(
+                description,
+                stats1.getFailedPublicationElapsedMillis() - stats0.getFailedPublicationElapsedMillis(),
+                greaterThanOrEqualTo(notifyAdvancer.getElapsedTime()));
+
+            // this action is atomic, no simulated time can elapse
+            assertThat(description, stats0.getFailedContextConstructionElapsedMillis(), equalTo(0L));
+            assertThat(description, stats1.getFailedContextConstructionElapsedMillis(), equalTo(0L));
+
+            // no state should have been applied
+            assertThat(
+                description,
+                stats1.getFailedMasterApplyElapsedMillis() - stats0.getFailedMasterApplyElapsedMillis(),
+                equalTo(0L));
+        }
+    }
+
     public void testJoiningNodeReceivesFullState() {
         try (Cluster cluster = new Cluster(randomIntBetween(1, 5))) {
             cluster.runRandomly();

+ 2 - 0
server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

@@ -121,6 +121,7 @@ public class NodeJoinTests extends ESTestCase {
             fakeThreadPool, deterministicTaskQueue::scheduleNow);
         setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get(), nodeHealthService);
         fakeMasterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
+            ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
             coordinator.handlePublishRequest(new PublishRequest(clusterStatePublicationEvent.getNewState()));
             publishListener.onResponse(null);
         });
@@ -132,6 +133,7 @@ public class NodeJoinTests extends ESTestCase {
             new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
         AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialState);
         masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
+            ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
             clusterStateRef.set(clusterStatePublicationEvent.getNewState());
             publishListener.onResponse(null);
         });

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java

@@ -70,7 +70,7 @@ public class PublicationTransportHandlerTests extends ESTestCase {
         };
 
         ElasticsearchException e = expectThrows(ElasticsearchException.class, () ->
-            handler.newPublicationContext(new ClusterStatePublicationEvent("test", clusterState, unserializableClusterState)));
+            handler.newPublicationContext(new ClusterStatePublicationEvent("test", clusterState, unserializableClusterState, 0L, 0L)));
         assertNotNull(e.getCause());
         assertThat(e.getCause(), instanceOf(IOException.class));
         assertThat(e.getCause().getMessage(), containsString("Simulated failure of diff serialization"));

+ 457 - 0
server/src/test/java/org/elasticsearch/cluster/service/ClusterStateUpdateStatsWireSerializationTests.java

@@ -0,0 +1,457 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.service;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+public class ClusterStateUpdateStatsWireSerializationTests extends AbstractWireSerializingTestCase<ClusterStateUpdateStats> {
+
+    @Override
+    protected Writeable.Reader<ClusterStateUpdateStats> instanceReader() {
+        return ClusterStateUpdateStats::new;
+    }
+
+    @Override
+    protected ClusterStateUpdateStats createTestInstance() {
+        return new ClusterStateUpdateStats(
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong());
+    }
+
+    private static long not(long l) {
+        return randomValueOtherThan(l, ESTestCase::randomNonNegativeLong);
+    }
+
+    @Override
+    protected ClusterStateUpdateStats mutateInstance(ClusterStateUpdateStats instance) throws IOException {
+        switch (between(1,19)) {
+            case 1:
+                return new ClusterStateUpdateStats(
+                    not(instance.getUnchangedTaskCount()),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 2:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    not(instance.getPublicationSuccessCount()),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 3:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    not(instance.getPublicationFailureCount()),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 4:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    not(instance.getUnchangedComputationElapsedMillis()),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 5:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    not(instance.getUnchangedNotificationElapsedMillis()),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 6:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    not(instance.getSuccessfulComputationElapsedMillis()),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 7:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    not(instance.getSuccessfulPublicationElapsedMillis()),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 8:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    not(instance.getSuccessfulContextConstructionElapsedMillis()),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 9:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    not(instance.getSuccessfulCommitElapsedMillis()),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 10:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    not(instance.getSuccessfulCompletionElapsedMillis()),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 11:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    not(instance.getSuccessfulMasterApplyElapsedMillis()),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 12:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    not(instance.getSuccessfulNotificationElapsedMillis()),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 13:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    not(instance.getFailedComputationElapsedMillis()),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 14:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    not(instance.getFailedPublicationElapsedMillis()),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 15:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    not(instance.getFailedContextConstructionElapsedMillis()),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 16:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    not(instance.getFailedCommitElapsedMillis()),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 17:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    not(instance.getFailedCompletionElapsedMillis()),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    instance.getFailedNotificationElapsedMillis());
+            case 18:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    not(instance.getFailedMasterApplyElapsedMillis()),
+                    instance.getFailedNotificationElapsedMillis());
+            case 19:
+                return new ClusterStateUpdateStats(
+                    instance.getUnchangedTaskCount(),
+                    instance.getPublicationSuccessCount(),
+                    instance.getPublicationFailureCount(),
+                    instance.getUnchangedComputationElapsedMillis(),
+                    instance.getUnchangedNotificationElapsedMillis(),
+                    instance.getSuccessfulComputationElapsedMillis(),
+                    instance.getSuccessfulPublicationElapsedMillis(),
+                    instance.getSuccessfulContextConstructionElapsedMillis(),
+                    instance.getSuccessfulCommitElapsedMillis(),
+                    instance.getSuccessfulCompletionElapsedMillis(),
+                    instance.getSuccessfulMasterApplyElapsedMillis(),
+                    instance.getSuccessfulNotificationElapsedMillis(),
+                    instance.getFailedComputationElapsedMillis(),
+                    instance.getFailedPublicationElapsedMillis(),
+                    instance.getFailedContextConstructionElapsedMillis(),
+                    instance.getFailedCommitElapsedMillis(),
+                    instance.getFailedCompletionElapsedMillis(),
+                    instance.getFailedMasterApplyElapsedMillis(),
+                    not(instance.getFailedNotificationElapsedMillis()));
+        }
+        throw new AssertionError("impossible");
+    }
+}

+ 13 - 2
server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

@@ -16,8 +16,8 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.ClusterStatePublicationEvent;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStatePublicationEvent;
 import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
@@ -39,6 +39,7 @@ import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.node.Node;
+import org.elasticsearch.test.ClusterServiceUtils;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.MockLogAppender;
 import org.elasticsearch.test.junit.annotations.TestLogging;
@@ -86,6 +87,11 @@ public class MasterServiceTests extends ESTestCase {
             public long relativeTimeInMillis() {
                 return relativeTimeInMillis;
             }
+
+            @Override
+            public long rawRelativeTimeInMillis() {
+                return relativeTimeInMillis();
+            }
         };
     }
 
@@ -118,6 +124,7 @@ public class MasterServiceTests extends ESTestCase {
         final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
         masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
             clusterStateRef.set(clusterStatePublicationEvent.getNewState());
+            ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
             publishListener.onResponse(null);
         });
         masterService.setClusterStateSupplier(clusterStateRef::get);
@@ -726,6 +733,7 @@ public class MasterServiceTests extends ESTestCase {
                 .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
             final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
             masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
+                ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
                 if (clusterStatePublicationEvent.getSummary().contains("test5")) {
                     relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
                         + randomLongBetween(1, 1000000);
@@ -894,7 +902,10 @@ public class MasterServiceTests extends ESTestCase {
                     .masterNodeId(node1.getId()))
                 .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
             final AtomicReference<ClusterStatePublisher> publisherRef = new AtomicReference<>();
-            masterService.setClusterStatePublisher((e, pl, al) -> publisherRef.get().publish(e, pl, al));
+            masterService.setClusterStatePublisher((e, pl, al) -> {
+                ClusterServiceUtils.setAllElapsedMillis(e);
+                publisherRef.get().publish(e, pl, al);
+            });
             masterService.setClusterStateSupplier(() -> initialClusterState);
             masterService.start();
 

+ 15 - 3
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -731,6 +731,10 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
             clusterNodes.forEach(ClusterNode::close);
         }
 
+        protected List<NamedWriteableRegistry.Entry> extraNamedWriteables() {
+            return emptyList();
+        }
+
         class MockPersistedState implements CoordinationState.PersistedState {
             private final CoordinationState.PersistedState delegate;
             private final NodeEnvironment nodeEnvironment;
@@ -828,8 +832,9 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                             }
                         }
 
-                        StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
-                            new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
+                        final StreamInput inStream = new NamedWriteableAwareStreamInput(
+                            outStream.bytes().streamInput(),
+                            getNamedWriteableRegistry());
                         // adapt cluster state to new localNode instance and add blocks
                         delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(persistedCurrentTerm),
                             ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode)));
@@ -881,6 +886,13 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
             }
         }
 
+        private NamedWriteableRegistry getNamedWriteableRegistry() {
+            return new NamedWriteableRegistry(Stream.concat(
+                ClusterModule.getNamedWriteables().stream(),
+                extraNamedWriteables().stream()
+            ).collect(Collectors.toList()));
+        }
+
         public class ClusterNode {
             private final Logger logger = LogManager.getLogger(ClusterNode.class);
 
@@ -958,7 +970,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                 final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
                     Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
                 final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
-                coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
+                coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, getNamedWriteableRegistry(),
                     allocationService, masterService, this::getPersistedState,
                     Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, p, r) -> {},
                     getElectionStrategy(), nodeHealthService);

+ 2 - 2
test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java

@@ -111,7 +111,7 @@ public class FakeThreadPoolMasterService extends MasterService {
     }
 
     @Override
-    protected void publish(ClusterStatePublicationEvent clusterStatePublicationEvent, TaskOutputs taskOutputs, long startTimeMillis) {
+    protected void publish(ClusterStatePublicationEvent clusterStatePublicationEvent, TaskOutputs taskOutputs) {
         assert waitForPublish == false;
         waitForPublish = true;
         final AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterStatePublicationEvent.getNewState());
@@ -140,7 +140,7 @@ public class FakeThreadPoolMasterService extends MasterService {
                 assert waitForPublish;
                 waitForPublish = false;
                 try {
-                    onPublicationFailed(clusterStatePublicationEvent, taskOutputs, startTimeMillis, e);
+                    onPublicationFailed(clusterStatePublicationEvent, taskOutputs, e);
                 } finally {
                     taskInProgress = false;
                     scheduleNextTaskIfNecessary();

+ 5 - 0
test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java

@@ -299,6 +299,11 @@ public class DeterministicTaskQueue {
                 return currentTimeMillis;
             }
 
+            @Override
+            public long rawRelativeTimeInMillis() {
+                return currentTimeMillis;
+            }
+
             @Override
             public long absoluteTimeInMillis() {
                 return currentTimeMillis;

+ 13 - 3
test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

@@ -12,6 +12,7 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStatePublicationEvent;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.NodeConnectionsService;
 import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -135,7 +136,8 @@ public class ClusterServiceUtils {
     }
 
     public static ClusterStatePublisher createClusterStatePublisher(ClusterApplier clusterApplier) {
-        return (clusterStatePublicationEvent, publishListener, ackListener) ->
+        return (clusterStatePublicationEvent, publishListener, ackListener) -> {
+            setAllElapsedMillis(clusterStatePublicationEvent);
             clusterApplier.onNewClusterState(
                 "mock_publish_to_self[" + clusterStatePublicationEvent.getSummary() + "]",
                 clusterStatePublicationEvent::getNewState,
@@ -149,8 +151,8 @@ public class ClusterServiceUtils {
                     public void onFailure(String source, Exception e) {
                         publishListener.onFailure(e);
                     }
-                }
-        );
+                });
+        };
     }
 
     public static ClusterService createClusterService(ClusterState initialState, ThreadPool threadPool) {
@@ -169,4 +171,12 @@ public class ClusterServiceUtils {
     public static void setState(ClusterService clusterService, ClusterState clusterState) {
         setState(clusterService.getClusterApplierService(), clusterState);
     }
+
+    public static void setAllElapsedMillis(ClusterStatePublicationEvent clusterStatePublicationEvent) {
+        clusterStatePublicationEvent.setPublicationContextConstructionElapsedMillis(0L);
+        clusterStatePublicationEvent.setPublicationCommitElapsedMillis(0L);
+        clusterStatePublicationEvent.setPublicationCompletionElapsedMillis(0L);
+        clusterStatePublicationEvent.setMasterApplyElapsedMillis(0L);
+    }
+
 }

+ 2 - 0
test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java

@@ -18,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.test.ClusterServiceUtils;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -58,6 +59,7 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase {
         FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", mockThreadPool, runnableTasks::add);
         masterService.setClusterStateSupplier(lastClusterStateRef::get);
         masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
+            ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
             lastClusterStateRef.set(clusterStatePublicationEvent.getNewState());
             publishingCallback.set(publishListener);
         });