Browse Source

Expose pending cluster state queue size in node stats

Add 3 stats about the queue: total queue size, number of committed cluster
states, and number of pending cluster states.
xuzha 10 years ago
parent
commit
97ecd7bf5a
22 changed files with 362 additions and 13 deletions
  1. 18 1
      core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java
  2. 19 0
      core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java
  3. 8 1
      core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java
  4. 1 1
      core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java
  5. 1 1
      core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
  6. 6 0
      core/src/main/java/org/elasticsearch/discovery/Discovery.java
  7. 78 0
      core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java
  8. 5 0
      core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java
  9. 8 0
      core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
  10. 97 0
      core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStateStats.java
  11. 13 0
      core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java
  12. 5 3
      core/src/main/java/org/elasticsearch/node/service/NodeService.java
  13. 1 0
      core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java
  14. 3 3
      core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java
  15. 1 1
      core/src/test/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java
  16. 5 0
      core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java
  17. 38 0
      core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java
  18. 25 0
      core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java
  19. 1 1
      core/src/test/java/org/elasticsearch/test/InternalTestCluster.java
  20. 3 0
      docs/reference/cluster/nodes-stats.asciidoc
  21. 1 1
      rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json
  22. 25 0
      rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/30_discovery.yaml

+ 18 - 1
core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java

@@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.discovery.DiscoveryStats;
 import org.elasticsearch.http.HttpStats;
 import org.elasticsearch.indices.NodeIndicesStats;
 import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
@@ -78,6 +79,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
     @Nullable
     private ScriptStats scriptStats;
 
+    @Nullable
+    private DiscoveryStats discoveryStats;
+
     NodeStats() {
     }
 
@@ -85,7 +89,8 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
                      @Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool,
                      @Nullable FsInfo fs, @Nullable TransportStats transport, @Nullable HttpStats http,
                      @Nullable AllCircuitBreakerStats breaker,
-                     @Nullable ScriptStats scriptStats) {
+                     @Nullable ScriptStats scriptStats,
+                     @Nullable DiscoveryStats discoveryStats) {
         super(node);
         this.timestamp = timestamp;
         this.indices = indices;
@@ -98,6 +103,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
         this.http = http;
         this.breaker = breaker;
         this.scriptStats = scriptStats;
+        this.discoveryStats = discoveryStats;
     }
 
     public long getTimestamp() {
@@ -177,6 +183,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
         return this.scriptStats;
     }
 
+    @Nullable
+    public DiscoveryStats getDiscoveryStats() {
+        return this.discoveryStats;
+    }
+
     public static NodeStats readNodeStats(StreamInput in) throws IOException {
         NodeStats nodeInfo = new NodeStats();
         nodeInfo.readFrom(in);
@@ -213,6 +224,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
         }
         breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in);
         scriptStats = in.readOptionalStreamable(new ScriptStats());
+        discoveryStats = in.readOptionalStreamable(new DiscoveryStats(null));
 
     }
 
@@ -270,6 +282,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
         }
         out.writeOptionalStreamable(breaker);
         out.writeOptionalStreamable(scriptStats);
+        out.writeOptionalStreamable(discoveryStats);
     }
 
     @Override
@@ -321,6 +334,10 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
             getScriptStats().toXContent(builder, params);
         }
 
+        if (getDiscoveryStats() != null) {
+            getDiscoveryStats().toXContent(builder, params);
+        }
+
         return builder;
     }
 }

+ 19 - 0
core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java

@@ -41,6 +41,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
     private boolean http;
     private boolean breaker;
     private boolean script;
+    private boolean discovery;
 
     public NodesStatsRequest() {
     }
@@ -67,6 +68,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         this.http = true;
         this.breaker = true;
         this.script = true;
+        this.discovery = true;
         return this;
     }
 
@@ -84,6 +86,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         this.http = false;
         this.breaker = false;
         this.script = false;
+        this.discovery = false;
         return this;
     }
 
@@ -234,6 +237,20 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         return this;
     }
 
+
+    public boolean discovery() {
+        return this.discovery;
+    }
+
+    /**
+     * Should the node's discovery stats be returned.
+     */
+    public NodesStatsRequest discovery(boolean discovery) {
+        this.discovery = discovery;
+        return this;
+    }
+
+
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
@@ -247,6 +264,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         http = in.readBoolean();
         breaker = in.readBoolean();
         script = in.readBoolean();
+        discovery = in.readBoolean();
     }
 
     @Override
@@ -262,6 +280,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         out.writeBoolean(http);
         out.writeBoolean(breaker);
         out.writeBoolean(script);
+        out.writeBoolean(discovery);
     }
 
 }

+ 8 - 1
core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java

@@ -19,7 +19,6 @@
 
 package org.elasticsearch.action.admin.cluster.node.stats;
 
-import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
 import org.elasticsearch.action.support.nodes.NodesOperationRequestBuilder;
 import org.elasticsearch.client.ElasticsearchClient;
@@ -130,4 +129,12 @@ public class NodesStatsRequestBuilder extends NodesOperationRequestBuilder<Nodes
         request.http(http);
         return this;
     }
+
+    /**
+     * Should the discovery stats be returned.
+     */
+    public NodesStatsRequestBuilder setDiscovery(boolean discovery) {
+        request.discovery(discovery);
+        return this;
+    }
 }

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

@@ -80,7 +80,7 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
     protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
         NodesStatsRequest request = nodeStatsRequest.request;
         return nodeService.stats(request.indices(), request.os(), request.process(), request.jvm(), request.threadPool(),
-                request.fs(), request.transport(), request.http(), request.breaker(), request.script());
+                request.fs(), request.transport(), request.http(), request.breaker(), request.script(), request.discovery());
     }
 
     @Override

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

@@ -101,7 +101,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
     @Override
     protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
         NodeInfo nodeInfo = nodeService.info(false, true, false, true, false, true, false, true);
-        NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, true, false, false, false, false);
+        NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, true, false, false, false, false, false);
         List<ShardStats> shardsStats = new ArrayList<>();
         for (IndexService indexService : indicesService) {
             for (IndexShard indexShard : indexService) {

+ 6 - 0
core/src/main/java/org/elasticsearch/discovery/Discovery.java

@@ -87,4 +87,10 @@ public interface Discovery extends LifecycleComponent<Discovery> {
             super(msg, cause, args);
         }
     }
+
+    /**
+     * @return stats about the discovery
+     */
+    DiscoveryStats stats();
+
 }

+ 78 - 0
core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java

@@ -0,0 +1,78 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.discovery;
+
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats;
+
+import java.io.IOException;
+
+public class DiscoveryStats implements Streamable, ToXContent {
+
+    @Nullable
+    private PendingClusterStateStats queueStats;
+
+    public DiscoveryStats(PendingClusterStateStats queueStats) {
+        this.queueStats = queueStats;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject(Fields.DISCOVERY);
+
+        if (queueStats != null ){
+            queueStats.toXContent(builder, params);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        if (in.readBoolean()) {
+            queueStats = new PendingClusterStateStats();
+            queueStats.readFrom(in);
+        }
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        if (queueStats != null ) {
+            out.writeBoolean(true);
+            queueStats.writeTo(out);
+        }else{
+            out.writeBoolean(false);
+        }
+    }
+
+    static final class Fields {
+        static final XContentBuilderString DISCOVERY = new XContentBuilderString("discovery");
+    }
+
+    public PendingClusterStateStats getQueueStats() {
+        return queueStats;
+    }
+}

+ 5 - 0
core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java

@@ -316,6 +316,11 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
         }
     }
 
+    @Override
+    public DiscoveryStats stats() {
+        return new DiscoveryStats(null);
+    }
+
     private LocalDiscovery[] members() {
         ClusterGroup clusterGroup = clusterGroups.get(clusterName);
         if (clusterGroup == null) {

+ 8 - 0
core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -43,6 +43,8 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.discovery.Discovery;
 import org.elasticsearch.discovery.DiscoverySettings;
+import org.elasticsearch.discovery.DiscoveryStats;
+import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats;
 import org.elasticsearch.discovery.InitialStateDiscoveryListener;
 import org.elasticsearch.discovery.zen.elect.ElectMasterService;
 import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
@@ -337,6 +339,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
         }
     }
 
+    @Override
+    public DiscoveryStats stats() {
+        PendingClusterStateStats queueStats = publishClusterState.pendingStatesQueue().stats();
+        return new DiscoveryStats(queueStats);
+    }
+
     /**
      * returns true if zen discovery is started and there is a currently a background thread active for (re)joining
      * the cluster used for testing.

+ 97 - 0
core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStateStats.java

@@ -0,0 +1,97 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.discovery.zen.publish;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+
+import java.io.IOException;
+
+/**
+ * Class encapsulating stats about the PendingClusterStatsQueue
+ */
+public class PendingClusterStateStats implements Streamable, ToXContent {
+
+    private int total;
+    private int pending;
+    private int committed;
+
+    public PendingClusterStateStats() {
+
+    }
+
+    public PendingClusterStateStats(int total, int pending, int committed) {
+        this.total = total;
+        this.pending = pending;
+        this.committed = committed;
+    }
+
+    public int getCommitted() {
+        return committed;
+    }
+
+    public int getPending() {
+        return pending;
+    }
+
+    public int getTotal() {
+        return total;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject(Fields.QUEUE);
+        builder.field(Fields.TOTAL, total);
+        builder.field(Fields.PENDING, pending);
+        builder.field(Fields.COMMITTED, committed);
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        total = in.readVInt();
+        pending = in.readVInt();
+        committed = in.readVInt();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeVInt(total);
+        out.writeVInt(pending);
+        out.writeVInt(committed);
+    }
+
+    static final class Fields {
+        static final XContentBuilderString QUEUE = new XContentBuilderString("cluster_state_queue");
+        static final XContentBuilderString TOTAL = new XContentBuilderString("total");
+        static final XContentBuilderString PENDING = new XContentBuilderString("pending");
+        static final XContentBuilderString COMMITTED = new XContentBuilderString("committed");
+    }
+
+    @Override
+    public String toString() {
+        return "PendingClusterStateStats(total=" + total + ", pending=" + pending + ", committed=" + committed + ")";
+    }
+}

+ 13 - 0
core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java

@@ -283,4 +283,17 @@ public class PendingClusterStatesQueue {
         }
     }
 
+    public synchronized PendingClusterStateStats stats() {
+
+        // calculate committed cluster state
+        int committed = 0;
+        for (ClusterStateContext clusterStatsContext : pendingStates) {
+            if (clusterStatsContext.committed()) {
+                committed += 1;
+            }
+        }
+
+        return new PendingClusterStateStats(pendingStates.size(), pendingStates.size() - committed, committed);
+    }
+
 }

+ 5 - 3
core/src/main/java/org/elasticsearch/node/service/NodeService.java

@@ -152,13 +152,14 @@ public class NodeService extends AbstractComponent {
                 transportService.stats(),
                 httpServer == null ? null : httpServer.stats(),
                 circuitBreakerService.stats(),
-                scriptService.stats()
+                scriptService.stats(),
+                discovery.stats()
         );
     }
 
     public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool,
                            boolean fs, boolean transport, boolean http, boolean circuitBreaker,
-                           boolean script) {
+                           boolean script, boolean discoveryStats) {
         // for indices stats we want to include previous allocated shards stats as well (it will
         // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
         return new NodeStats(discovery.localNode(), System.currentTimeMillis(),
@@ -171,7 +172,8 @@ public class NodeService extends AbstractComponent {
                 transport ? transportService.stats() : null,
                 http ? (httpServer == null ? null : httpServer.stats()) : null,
                 circuitBreaker ? circuitBreakerService.stats() : null,
-                script ? scriptService.stats() : null
+                script ? scriptService.stats() : null,
+                discoveryStats ? discovery.stats() : null
         );
     }
 }

+ 1 - 0
core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java

@@ -77,6 +77,7 @@ public class RestNodesStatsAction extends BaseRestHandler {
             nodesStatsRequest.process(metrics.contains("process"));
             nodesStatsRequest.breaker(metrics.contains("breaker"));
             nodesStatsRequest.script(metrics.contains("script"));
+            nodesStatsRequest.discovery(metrics.contains("discovery"));
 
             // check for index specific metrics
             if (metrics.contains("indices")) {

+ 3 - 3
core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java

@@ -141,11 +141,11 @@ public class DiskUsageTests extends ESTestCase {
         };
         NodeStats[] nodeStats = new NodeStats[] {
                 new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
-                        null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null),
+                        null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null),
                 new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null),
+                        null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null),
                 new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null)
+                        null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null)
         };
         InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages);
         DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1");

+ 1 - 1
core/src/test/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java

@@ -73,7 +73,7 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
             null, null, null, null, null,
             fsInfo,
             null, null, null,
-            null);
+            null, null);
     }
 
     @Inject

+ 5 - 0
core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java

@@ -116,6 +116,11 @@ public class DiscoveryModuleTests extends ModuleTestCase {
 
         }
 
+        @Override
+        public DiscoveryStats stats() {
+            return null;
+        }
+
         @Override
         public Lifecycle.State lifecycleState() {
             return null;

+ 38 - 0
core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java

@@ -22,6 +22,7 @@ package org.elasticsearch.discovery.zen;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
 import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterService;
@@ -34,7 +35,11 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.transport.LocalTransportAddress;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.discovery.Discovery;
+import org.elasticsearch.discovery.DiscoveryStats;
 import org.elasticsearch.discovery.zen.elect.ElectMasterService;
 import org.elasticsearch.discovery.zen.fd.FaultDetection;
 import org.elasticsearch.discovery.zen.membership.MembershipAction;
@@ -256,4 +261,37 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
         assertThat("Can't join master because version 1.6.0 is lower than the minimum compatable version 2.0.0 can support", electMasterService.electMaster(Collections.singletonList(node)), nullValue());
     }
 
+    public void testDiscoveryStats() throws IOException {
+        String expectedStatsJsonResponse = "{\n" +
+                "  \"discovery\" : {\n" +
+                "    \"cluster_state_queue\" : {\n" +
+                "      \"total\" : 0,\n" +
+                "      \"pending\" : 0,\n" +
+                "      \"committed\" : 0\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+
+        Settings nodeSettings = Settings.settingsBuilder()
+                .put("discovery.type", "zen") // <-- To override the local setting if set externally
+                .build();
+        internalCluster().startNode(nodeSettings);
+
+        logger.info("--> request node discovery stats");
+        NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setDiscovery(true).get();
+        assertThat(statsResponse.getNodes().length, equalTo(1));
+
+        DiscoveryStats stats = statsResponse.getNodes()[0].getDiscoveryStats();
+        assertThat(stats.getQueueStats(), notNullValue());
+        assertThat(stats.getQueueStats().getTotal(), equalTo(0));
+        assertThat(stats.getQueueStats().getCommitted(), equalTo(0));
+        assertThat(stats.getQueueStats().getPending(), equalTo(0));
+
+        XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
+        builder.startObject();
+        stats.toXContent(builder, ToXContent.EMPTY_PARAMS);
+        builder.endObject();
+
+        assertThat(builder.string(), equalTo(expectedStatsJsonResponse));
+    }
 }

+ 25 - 0
core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java

@@ -162,6 +162,31 @@ public class PendingClusterStatesQueueTests extends ESTestCase {
         }
     }
 
+    public void testQueueStats() {
+        List<ClusterState> states = randomStates(scaledRandomIntBetween(10, 100), "master");
+        PendingClusterStatesQueue queue = createQueueWithStates(states);
+        assertThat(queue.stats().getTotal(), equalTo(states.size()));
+        assertThat(queue.stats().getPending(), equalTo(states.size()));
+        assertThat(queue.stats().getCommitted(), equalTo(0));
+
+        List<ClusterStateContext> committedContexts = randomCommitStates(queue);
+        assertThat(queue.stats().getTotal(), equalTo(states.size()));
+        assertThat(queue.stats().getPending(), equalTo(states.size() - committedContexts.size()));
+        assertThat(queue.stats().getCommitted(), equalTo(committedContexts.size()));
+
+        ClusterState highestCommitted = null;
+        for (ClusterStateContext context : committedContexts) {
+            if (highestCommitted == null || context.state.supersedes(highestCommitted)) {
+                highestCommitted = context.state;
+            }
+        }
+
+        queue.markAsProcessed(highestCommitted);
+        assertThat(queue.stats().getTotal(), equalTo(states.size() - committedContexts.size()));
+        assertThat(queue.stats().getPending(), equalTo(states.size() - committedContexts.size()));
+        assertThat(queue.stats().getCommitted(), equalTo(0));
+    }
+
     protected List<ClusterStateContext> randomCommitStates(PendingClusterStatesQueue queue) {
         List<ClusterStateContext> committedContexts = new ArrayList<>();
         for (int iter = randomInt(queue.pendingStates.size() - 1); iter >= 0; iter--) {

+ 1 - 1
core/src/test/java/org/elasticsearch/test/InternalTestCluster.java

@@ -1873,7 +1873,7 @@ public final class InternalTestCluster extends TestCluster {
                 }
 
                 NodeService nodeService = getInstanceFromNode(NodeService.class, nodeAndClient.node);
-                NodeStats stats = nodeService.stats(CommonStatsFlags.ALL, false, false, false, false, false, false, false, false, false);
+                NodeStats stats = nodeService.stats(CommonStatsFlags.ALL, false, false, false, false, false, false, false, false, false, false);
                 assertThat("Fielddata size must be 0 on node: " + stats.getNode(), stats.getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l));
                 assertThat("Query cache size must be 0 on node: " + stats.getNode(), stats.getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0l));
                 assertThat("FixedBitSet cache size must be 0 on node: " + stats.getNode(), stats.getIndices().getSegments().getBitsetMemoryInBytes(), equalTo(0l));

+ 3 - 0
docs/reference/cluster/nodes-stats.asciidoc

@@ -57,6 +57,9 @@ of `indices`, `os`, `process`, `jvm`, `transport`, `http`,
 `breaker`::
 	Statistics about the field data circuit breaker
 
+`discovery`::
+	Statistics about the discovery
+
 [source,js]
 --------------------------------------------------
 # return indices and os

+ 1 - 1
rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json

@@ -15,7 +15,7 @@
       "parts": {
         "metric" : {
           "type" : "list",
-          "options" : ["_all", "breaker", "fs", "http", "indices", "jvm", "os", "process", "thread_pool", "transport"],
+          "options" : ["_all", "breaker", "fs", "http", "indices", "jvm", "os", "process", "thread_pool", "transport", "discovery"],
           "description" : "Limit the information returned to the specified metrics"
         },
         "index_metric" : {

+ 25 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/30_discovery.yaml

@@ -0,0 +1,25 @@
+---
+"Discovery stats":
+  - do:
+      cluster.state: {}
+
+  # Get master node id
+  - set: { master_node: master }
+
+  - do:
+      nodes.stats:
+        metric: [ discovery ]
+
+  - is_true: cluster_name
+  - is_true: nodes
+  - is_true: nodes.$master.discovery
+
+  - do:
+      nodes.stats:
+        filter_path: "nodes.*.discovery"
+
+  - is_false: cluster_name
+  - is_true:  nodes
+  - is_false: nodes.$master.name
+  - is_false: nodes.$master.jvm
+  - is_true:  nodes.$master.discovery