浏览代码

more Writeable in ClusterStatsNodes

javanna 9 年之前
父节点
当前提交
2c6e78e16c

+ 191 - 199
core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java

@@ -28,6 +28,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -41,42 +42,57 @@ import org.elasticsearch.plugins.PluginInfo;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class ClusterStatsNodes implements ToXContent, Streamable {
+public class ClusterStatsNodes implements ToXContent, Writeable<ClusterStatsNodes> {
 
-    private Counts counts;
-    private Set<Version> versions;
-    private OsStats os;
-    private ProcessStats process;
-    private JvmStats jvm;
-    private FsInfo.Path fs;
-    private Set<PluginInfo> plugins;
+    private final Counts counts;
+    private final Set<Version> versions;
+    private final OsStats os;
+    private final ProcessStats process;
+    private final JvmStats jvm;
+    private final FsInfo.Path fs;
+    private final Set<PluginInfo> plugins;
 
-    private ClusterStatsNodes() {
+    ClusterStatsNodes(StreamInput in) throws IOException {
+        this.counts = new Counts(in);
+
+        int size = in.readVInt();
+        this.versions = new HashSet<>(size);
+        for (; size > 0; size--) {
+            this.versions.add(Version.readVersion(in));
+        }
+
+        this.os = new OsStats(in);
+        this.process = new ProcessStats(in);
+        this.jvm = new JvmStats(in);
+        this.fs = FsInfo.Path.readInfoFrom(in);
+
+        size = in.readVInt();
+        this.plugins = new HashSet<>(size);
+        for (; size > 0; size--) {
+            this.plugins.add(PluginInfo.readFromStream(in));
+        }
     }
 
-    public ClusterStatsNodes(ClusterStatsNodeResponse[] nodeResponses) {
-        this.counts = new Counts();
+    ClusterStatsNodes(ClusterStatsNodeResponse[] nodeResponses) {
         this.versions = new HashSet<>();
-        this.os = new OsStats();
-        this.jvm = new JvmStats();
         this.fs = new FsInfo.Path();
         this.plugins = new HashSet<>();
-        this.process = new ProcessStats();
 
         Set<InetAddress> seenAddresses = new HashSet<>(nodeResponses.length);
-
+        List<NodeInfo> nodeInfos = new ArrayList<>();
+        List<NodeStats> nodeStats = new ArrayList<>();
         for (ClusterStatsNodeResponse nodeResponse : nodeResponses) {
-
-            counts.addNodeInfo(nodeResponse.nodeInfo());
-            versions.add(nodeResponse.nodeInfo().getVersion());
-            process.addNodeStats(nodeResponse.nodeStats());
-            jvm.addNodeInfoStats(nodeResponse.nodeInfo(), nodeResponse.nodeStats());
-            plugins.addAll(nodeResponse.nodeInfo().getPlugins().getPluginInfos());
+            nodeInfos.add(nodeResponse.nodeInfo());
+            nodeStats.add(nodeResponse.nodeStats());
+            this.versions.add(nodeResponse.nodeInfo().getVersion());
+            this.plugins.addAll(nodeResponse.nodeInfo().getPlugins().getPluginInfos());
 
             // now do the stats that should be deduped by hardware (implemented by ip deduping)
             TransportAddress publishAddress = nodeResponse.nodeInfo().getTransport().address().publishAddress();
@@ -84,19 +100,19 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
             if (publishAddress.uniqueAddressTypeId() == 1) {
                 inetAddress = ((InetSocketTransportAddress) publishAddress).address().getAddress();
             }
-
             if (!seenAddresses.add(inetAddress)) {
                 continue;
             }
-
-            os.addNodeInfo(nodeResponse.nodeInfo());
             if (nodeResponse.nodeStats().getFs() != null) {
-                fs.add(nodeResponse.nodeStats().getFs().total());
+                this.fs.add(nodeResponse.nodeStats().getFs().total());
             }
         }
+        this.counts = new Counts(nodeInfos);
+        this.os = new OsStats(nodeInfos);
+        this.process = new ProcessStats(nodeStats);
+        this.jvm = new JvmStats(nodeInfos, nodeStats);
     }
 
-
     public Counts getCounts() {
         return this.counts;
     }
@@ -127,25 +143,8 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
 
 
     @Override
-    public void readFrom(StreamInput in) throws IOException {
-        counts = Counts.readCounts(in);
-
-        int size = in.readVInt();
-        versions = new HashSet<>(size);
-        for (; size > 0; size--) {
-            versions.add(Version.readVersion(in));
-        }
-
-        os = OsStats.readOsStats(in);
-        process = ProcessStats.readStats(in);
-        jvm = JvmStats.readJvmStats(in);
-        fs = FsInfo.Path.readInfoFrom(in);
-
-        size = in.readVInt();
-        plugins = new HashSet<>(size);
-        for (; size > 0; size--) {
-            plugins.add(PluginInfo.readFromStream(in));
-        }
+    public ClusterStatsNodes readFrom(StreamInput in) throws IOException {
+        return new ClusterStatsNodes(in);
     }
 
     @Override
@@ -163,12 +162,6 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
         }
     }
 
-    public static ClusterStatsNodes readNodeStats(StreamInput in) throws IOException {
-        ClusterStatsNodes nodeStats = new ClusterStatsNodes();
-        nodeStats.readFrom(in);
-        return nodeStats;
-    }
-
     static final class Fields {
         static final XContentBuilderString COUNT = new XContentBuilderString("count");
         static final XContentBuilderString VERSIONS = new XContentBuilderString("versions");
@@ -214,31 +207,39 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
         return builder;
     }
 
-    public static class Counts implements Streamable, ToXContent {
+    public static class Counts implements Writeable<Counts>, ToXContent {
         static final String COORDINATING_ONLY = "coordinating_only";
 
-        private int total;
-        private Map<String, Integer> roles;
+        private final int total;
+        private final Map<String, Integer> roles;
+
+        @SuppressWarnings("unchecked")
+        private Counts(StreamInput in) throws IOException {
+            this.total = in.readVInt();
+            this.roles = (Map<String, Integer>)in.readGenericValue();
+        }
 
-        Counts() {
-            roles = new HashMap<>();
+        private Counts(List<NodeInfo> nodeInfos) {
+            this.roles = new HashMap<>();
             for (DiscoveryNode.Role role : DiscoveryNode.Role.values()) {
-                roles.put(role.getRoleName(), 0);
+                this.roles.put(role.getRoleName(), 0);
             }
-            roles.put(COORDINATING_ONLY, 0);
-        }
-
-        public void addNodeInfo(NodeInfo nodeInfo) {
-            total++;
-            if (nodeInfo.getNode().getRoles().size() == 0) {
-                Integer count = roles.get(COORDINATING_ONLY);
-                roles.put(COORDINATING_ONLY, ++count);
-            } else {
-                for (DiscoveryNode.Role role : nodeInfo.getNode().getRoles()) {
-                    Integer count = roles.get(role.getRoleName());
-                    roles.put(role.getRoleName(), ++count);
+            this.roles.put(COORDINATING_ONLY, 0);
+
+            int total = 0;
+            for (NodeInfo nodeInfo : nodeInfos) {
+                total++;
+                if (nodeInfo.getNode().getRoles().isEmpty()) {
+                    Integer count = roles.get(COORDINATING_ONLY);
+                    roles.put(COORDINATING_ONLY, ++count);
+                } else {
+                    for (DiscoveryNode.Role role : nodeInfo.getNode().getRoles()) {
+                        Integer count = roles.get(role.getRoleName());
+                        roles.put(role.getRoleName(), ++count);
+                    }
                 }
             }
+            this.total = total;
         }
 
         public int getTotal() {
@@ -249,17 +250,9 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
             return roles;
         }
 
-        public static Counts readCounts(StreamInput in) throws IOException {
-            Counts c = new Counts();
-            c.readFrom(in);
-            return c;
-        }
-
         @Override
-        @SuppressWarnings("unchecked")
-        public void readFrom(StreamInput in) throws IOException {
-            total = in.readVInt();
-            roles = (Map<String, Integer>)in.readGenericValue();
+        public Counts readFrom(StreamInput in) throws IOException {
+            return new Counts(in);
         }
 
         @Override
@@ -282,24 +275,36 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
         }
     }
 
-    public static class OsStats implements ToXContent, Streamable {
-
-        int availableProcessors;
-        int allocatedProcessors;
-        long availableMemory;
+    public static class OsStats implements ToXContent, Writeable<OsStats> {
+        final int availableProcessors;
+        final int allocatedProcessors;
         final ObjectIntHashMap<String> names;
 
-        public OsStats() {
-            names = new ObjectIntHashMap<>();
+        @SuppressWarnings("unchecked")
+        private OsStats(StreamInput in) throws IOException {
+            this.availableProcessors = in.readVInt();
+            this.allocatedProcessors = in.readVInt();
+            int size = in.readVInt();
+            this.names = new ObjectIntHashMap<>();
+            for (int i = 0; i < size; i++) {
+                names.addTo(in.readString(), in.readVInt());
+            }
         }
 
-        public void addNodeInfo(NodeInfo nodeInfo) {
-            availableProcessors += nodeInfo.getOs().getAvailableProcessors();
-            allocatedProcessors += nodeInfo.getOs().getAllocatedProcessors();
+        private OsStats(List<NodeInfo> nodeInfos) {
+            this.names = new ObjectIntHashMap<>();
+            int availableProcessors = 0;
+            int allocatedProcessors = 0;
+            for (NodeInfo nodeInfo : nodeInfos) {
+                availableProcessors += nodeInfo.getOs().getAvailableProcessors();
+                allocatedProcessors += nodeInfo.getOs().getAllocatedProcessors();
 
-            if (nodeInfo.getOs().getName() != null) {
-                names.addTo(nodeInfo.getOs().getName(), 1);
+                if (nodeInfo.getOs().getName() != null) {
+                    names.addTo(nodeInfo.getOs().getName(), 1);
+                }
             }
+            this.availableProcessors = availableProcessors;
+            this.allocatedProcessors = allocatedProcessors;
         }
 
         public int getAvailableProcessors() {
@@ -310,27 +315,15 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
             return allocatedProcessors;
         }
 
-        public ByteSizeValue getAvailableMemory() {
-            return new ByteSizeValue(availableMemory);
-        }
-
         @Override
-        public void readFrom(StreamInput in) throws IOException {
-            availableProcessors = in.readVInt();
-            allocatedProcessors = in.readVInt();
-            availableMemory = in.readLong();
-            int size = in.readVInt();
-            names.clear();
-            for (int i = 0; i < size; i++) {
-                names.addTo(in.readString(), in.readVInt());
-            }
+        public OsStats readFrom(StreamInput in) throws IOException {
+            return new OsStats(in);
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeVInt(availableProcessors);
             out.writeVInt(allocatedProcessors);
-            out.writeLong(availableMemory);
             out.writeVInt(names.size());
             for (ObjectIntCursor<String> name : names) {
                 out.writeString(name.key);
@@ -338,20 +331,11 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
             }
         }
 
-        public static OsStats readOsStats(StreamInput in) throws IOException {
-            OsStats os = new OsStats();
-            os.readFrom(in);
-            return os;
-        }
-
         static final class Fields {
             static final XContentBuilderString AVAILABLE_PROCESSORS = new XContentBuilderString("available_processors");
             static final XContentBuilderString ALLOCATED_PROCESSORS = new XContentBuilderString("allocated_processors");
             static final XContentBuilderString NAME = new XContentBuilderString("name");
             static final XContentBuilderString NAMES = new XContentBuilderString("names");
-            static final XContentBuilderString MEM = new XContentBuilderString("mem");
-            static final XContentBuilderString TOTAL = new XContentBuilderString("total");
-            static final XContentBuilderString TOTAL_IN_BYTES = new XContentBuilderString("total_in_bytes");
             static final XContentBuilderString COUNT = new XContentBuilderString("count");
         }
 
@@ -359,10 +343,6 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.field(Fields.AVAILABLE_PROCESSORS, availableProcessors);
             builder.field(Fields.ALLOCATED_PROCESSORS, allocatedProcessors);
-            builder.startObject(Fields.MEM);
-            builder.byteSizeField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, availableMemory);
-            builder.endObject();
-
             builder.startArray(Fields.NAMES);
             for (ObjectIntCursor<String> name : names) {
                 builder.startObject();
@@ -371,35 +351,54 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
                 builder.endObject();
             }
             builder.endArray();
-
             return builder;
         }
     }
 
-    public static class ProcessStats implements ToXContent, Streamable {
-
-        int count;
-        int cpuPercent;
-        long totalOpenFileDescriptors;
-        long minOpenFileDescriptors = Long.MAX_VALUE;
-        long maxOpenFileDescriptors = Long.MIN_VALUE;
-
-        public void addNodeStats(NodeStats nodeStats) {
-            if (nodeStats.getProcess() == null) {
-                return;
-            }
-            count++;
-            if (nodeStats.getProcess().getCpu() != null) {
-                cpuPercent += nodeStats.getProcess().getCpu().getPercent();
-            }
-            long fd = nodeStats.getProcess().getOpenFileDescriptors();
-            if (fd > 0) {
-                // fd can be -1 if not supported on platform
-                totalOpenFileDescriptors += fd;
+    public static class ProcessStats implements ToXContent, Writeable<ProcessStats> {
+
+        final int count;
+        final int cpuPercent;
+        final long totalOpenFileDescriptors;
+        final long minOpenFileDescriptors;
+        final long maxOpenFileDescriptors;
+
+        private ProcessStats(StreamInput in) throws IOException {
+            this.count = in.readVInt();
+            this.cpuPercent = in.readVInt();
+            this.totalOpenFileDescriptors = in.readVLong();
+            this.minOpenFileDescriptors = in.readLong();
+            this.maxOpenFileDescriptors = in.readLong();
+        }
+
+        private ProcessStats(List<NodeStats> nodeStatsList) {
+            int count = 0;
+            int cpuPercent = 0;
+            long totalOpenFileDescriptors = 0;
+            long minOpenFileDescriptors = Long.MAX_VALUE;
+            long maxOpenFileDescriptors = Long.MIN_VALUE;
+            for (NodeStats nodeStats : nodeStatsList) {
+                if (nodeStats.getProcess() == null) {
+                    continue;
+                }
+                count++;
+                if (nodeStats.getProcess().getCpu() != null) {
+                    cpuPercent += nodeStats.getProcess().getCpu().getPercent();
+                }
+                long fd = nodeStats.getProcess().getOpenFileDescriptors();
+                if (fd > 0) {
+                    // fd can be -1 if not supported on platform
+                    totalOpenFileDescriptors += fd;
+                }
+                // we still do min max calc on -1, so we'll have an indication of it not being supported on one of the nodes.
+                minOpenFileDescriptors = Math.min(minOpenFileDescriptors, fd);
+                maxOpenFileDescriptors = Math.max(maxOpenFileDescriptors, fd);
             }
-            // we still do min max calc on -1, so we'll have an indication of it not being supported on one of the nodes.
-            minOpenFileDescriptors = Math.min(minOpenFileDescriptors, fd);
-            maxOpenFileDescriptors = Math.max(maxOpenFileDescriptors, fd);
+            this.count = count;
+            this.cpuPercent = cpuPercent;
+            this.totalOpenFileDescriptors = totalOpenFileDescriptors;
+            this.minOpenFileDescriptors = minOpenFileDescriptors;
+            this.maxOpenFileDescriptors = maxOpenFileDescriptors;
         }
 
         /**
@@ -431,12 +430,8 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
         }
 
         @Override
-        public void readFrom(StreamInput in) throws IOException {
-            count = in.readVInt();
-            cpuPercent = in.readVInt();
-            totalOpenFileDescriptors = in.readVLong();
-            minOpenFileDescriptors = in.readLong();
-            maxOpenFileDescriptors = in.readLong();
+        public ProcessStats readFrom(StreamInput in) throws IOException {
+            return new ProcessStats(in);
         }
 
         @Override
@@ -448,12 +443,6 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
             out.writeLong(maxOpenFileDescriptors);
         }
 
-        public static ProcessStats readStats(StreamInput in) throws IOException {
-            ProcessStats cpu = new ProcessStats();
-            cpu.readFrom(in);
-            return cpu;
-        }
-
         static final class Fields {
             static final XContentBuilderString CPU = new XContentBuilderString("cpu");
             static final XContentBuilderString PERCENT = new XContentBuilderString("percent");
@@ -477,20 +466,54 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
         }
     }
 
-    public static class JvmStats implements Streamable, ToXContent {
+    public static class JvmStats implements Writeable<JvmStats>, ToXContent {
 
-        ObjectIntHashMap<JvmVersion> versions;
-        long threads;
-        long maxUptime;
-        long heapUsed;
-        long heapMax;
+        private final ObjectIntHashMap<JvmVersion> versions;
+        private final long threads;
+        private final long maxUptime;
+        private final long heapUsed;
+        private final long heapMax;
 
-        JvmStats() {
-            versions = new ObjectIntHashMap<>();
-            threads = 0;
-            maxUptime = 0;
-            heapMax = 0;
-            heapUsed = 0;
+        private JvmStats(StreamInput in) throws IOException {
+            int size = in.readVInt();
+            this.versions = new ObjectIntHashMap<>(size);
+            for (; size > 0; size--) {
+                this.versions.addTo(JvmVersion.readJvmVersion(in), in.readVInt());
+            }
+            this.threads = in.readVLong();
+            this.maxUptime = in.readVLong();
+            this.heapUsed = in.readVLong();
+            this.heapMax = in.readVLong();
+        }
+
+        private JvmStats(List<NodeInfo> nodeInfos, List<NodeStats> nodeStatsList) {
+            this.versions = new ObjectIntHashMap<>();
+            long threads = 0;
+            long maxUptime = 0;
+            long heapMax = 0;
+            long heapUsed = 0;
+            for (NodeInfo nodeInfo : nodeInfos) {
+                versions.addTo(new JvmVersion(nodeInfo.getJvm()), 1);
+            }
+
+            for (NodeStats nodeStats : nodeStatsList) {
+                org.elasticsearch.monitor.jvm.JvmStats js = nodeStats.getJvm();
+                if (js == null) {
+                    continue;
+                }
+                if (js.getThreads() != null) {
+                    threads += js.getThreads().getCount();
+                }
+                maxUptime = Math.max(maxUptime, js.getUptime().millis());
+                if (js.getMem() != null) {
+                    heapUsed += js.getMem().getHeapUsed().bytes();
+                    heapMax += js.getMem().getHeapMax().bytes();
+                }
+            }
+            this.threads = threads;
+            this.maxUptime = maxUptime;
+            this.heapUsed = heapUsed;
+            this.heapMax = heapMax;
         }
 
         public ObjectIntHashMap<JvmVersion> getVersions() {
@@ -525,33 +548,9 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
             return new ByteSizeValue(heapMax);
         }
 
-        public void addNodeInfoStats(NodeInfo nodeInfo, NodeStats nodeStats) {
-            versions.addTo(new JvmVersion(nodeInfo.getJvm()), 1);
-            org.elasticsearch.monitor.jvm.JvmStats js = nodeStats.getJvm();
-            if (js == null) {
-                return;
-            }
-            if (js.getThreads() != null) {
-                threads += js.getThreads().getCount();
-            }
-            maxUptime = Math.max(maxUptime, js.getUptime().millis());
-            if (js.getMem() != null) {
-                heapUsed += js.getMem().getHeapUsed().bytes();
-                heapMax += js.getMem().getHeapMax().bytes();
-            }
-        }
-
         @Override
-        public void readFrom(StreamInput in) throws IOException {
-            int size = in.readVInt();
-            versions = new ObjectIntHashMap<>(size);
-            for (; size > 0; size--) {
-                versions.addTo(JvmVersion.readJvmVersion(in), in.readVInt());
-            }
-            threads = in.readVLong();
-            maxUptime = in.readVLong();
-            heapUsed = in.readVLong();
-            heapMax = in.readVLong();
+        public JvmStats readFrom(StreamInput in) throws IOException {
+            return new JvmStats(in);
         }
 
         @Override
@@ -561,19 +560,12 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
                 v.key.writeTo(out);
                 out.writeVInt(v.value);
             }
-
             out.writeVLong(threads);
             out.writeVLong(maxUptime);
             out.writeVLong(heapUsed);
             out.writeVLong(heapMax);
         }
 
-        public static JvmStats readJvmStats(StreamInput in) throws IOException {
-            JvmStats jvmStats = new JvmStats();
-            jvmStats.readFrom(in);
-            return jvmStats;
-        }
-
         static final class Fields {
             static final XContentBuilderString VERSIONS = new XContentBuilderString("versions");
             static final XContentBuilderString VERSION = new XContentBuilderString("version");

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java

@@ -110,7 +110,7 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
             status = ClusterHealthStatus.fromValue(in.readByte());
         }
         clusterUUID = in.readString();
-        nodesStats = ClusterStatsNodes.readNodeStats(in);
+        nodesStats = new ClusterStatsNodes(in);
         indicesStats = ClusterStatsIndices.readIndicesStats(in);
     }