فهرست منبع

Fix ClusterInfo serialization

Martijn van Groningen 9 سال پیش
والد
کامیت
9b1a477120

+ 5 - 24
core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

@@ -43,7 +43,7 @@ public class ClusterInfo implements ToXContent, Writeable {
     private final ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage;
     final ImmutableOpenMap<String, Long> shardSizes;
     public static final ClusterInfo EMPTY = new ClusterInfo();
-    private final ImmutableOpenMap<ShardRouting, String> routingToDataPath;
+    final ImmutableOpenMap<ShardRouting, String> routingToDataPath;
 
     protected ClusterInfo() {
        this(ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
@@ -68,29 +68,10 @@ public class ClusterInfo implements ToXContent, Writeable {
     }
 
     public ClusterInfo(StreamInput in) throws IOException {
-        int size = in.readInt();
-        Map<String, DiskUsage> leastMap = new HashMap<>(size);
-        for (int i = 0; i < size; i++) {
-            leastMap.put(in.readString(), new DiskUsage(in));
-        }
-
-        size = in.readInt();
-        Map<String, DiskUsage> mostMap = new HashMap<>(size);
-        for (int i = 0; i < size; i++) {
-            mostMap.put(in.readString(), new DiskUsage(in));
-        }
-
-        size = in.readInt();
-        Map<String, Long> sizeMap = new HashMap<>(size);
-        for (int i = 0; i < size; i++) {
-            sizeMap.put(in.readString(), in.readLong());
-        }
-
-        size = in.readInt();
-        Map<ShardRouting, String> routingMap = new HashMap<>(size);
-        for (int i = 0; i < size; i++) {
-            routingMap.put(new ShardRouting(in), in.readString());
-        }
+        Map<String, DiskUsage> leastMap = in.readMap(StreamInput::readString, DiskUsage::new);
+        Map<String, DiskUsage> mostMap = in.readMap(StreamInput::readString, DiskUsage::new);
+        Map<String, Long> sizeMap = in.readMap(StreamInput::readString, StreamInput::readLong);
+        Map<ShardRouting, String> routingMap = in.readMap(ShardRouting::new, StreamInput::readString);
 
         ImmutableOpenMap.Builder<String, DiskUsage> leastBuilder = ImmutableOpenMap.builder();
         this.leastAvailableSpaceUsage = leastBuilder.putAll(leastMap).build();

+ 20 - 1
core/src/main/java/org/elasticsearch/cluster/DiskUsage.java

@@ -25,10 +25,11 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.xcontent.ToXContent;
-import org.elasticsearch.common.xcontent.ToXContent.Params;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 import java.io.IOException;
+import java.util.Objects;
+
 /**
  * Encapsulation class used to represent the amount of disk used on a node.
  */
@@ -126,6 +127,24 @@ public class DiskUsage implements ToXContent, Writeable {
         return getTotalBytes() - getFreeBytes();
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        DiskUsage other = (DiskUsage) o;
+        return Objects.equals(nodeId, other.nodeId) &&
+                Objects.equals(nodeName, other.nodeName) &&
+                Objects.equals(totalBytes, other.totalBytes) &&
+                Objects.equals(freeBytes, other.freeBytes);
+
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(nodeId, nodeName, path, totalBytes, freeBytes);
+    }
+
     @Override
     public String toString() {
         return "[" + nodeId + "][" + nodeName + "][" + path + "] free: " + new ByteSizeValue(getFreeBytes()) +

+ 11 - 0
core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

@@ -420,6 +420,17 @@ public abstract class StreamInput extends InputStream {
         return null;
     }
 
+    public <K, V> Map<K, V> readMap(Writeable.Reader<K> keyReader, Writeable.Reader<V> valueReader) throws IOException {
+        int size = readVInt();
+        Map<K, V> map = new HashMap<>(size);
+        for (int i = 0; i < size; i++) {
+            K key = keyReader.read(this);
+            V value = valueReader.read(this);
+            map.put(key, value);
+        }
+        return map;
+    }
+
     @Nullable
     @SuppressWarnings("unchecked")
     public Map<String, Object> readMap() throws IOException {

+ 88 - 0
core/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java

@@ -0,0 +1,88 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.routing.RestoreSource;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.snapshots.Snapshot;
+import org.elasticsearch.snapshots.SnapshotId;
+import org.elasticsearch.test.ESTestCase;
+
+public class ClusterInfoTests extends ESTestCase {
+
+    public void testSerialization() throws Exception {
+        ClusterInfo clusterInfo = new ClusterInfo(
+                randomDiskUsage(), randomDiskUsage(), randomShardSizes(), randomRoutingToDataPath()
+        );
+        BytesStreamOutput output = new BytesStreamOutput();
+        clusterInfo.writeTo(output);
+
+        ClusterInfo result = new ClusterInfo(output.bytes().streamInput());
+        assertEquals(clusterInfo.getNodeLeastAvailableDiskUsages(), result.getNodeLeastAvailableDiskUsages());
+        assertEquals(clusterInfo.getNodeMostAvailableDiskUsages(), result.getNodeMostAvailableDiskUsages());
+        assertEquals(clusterInfo.shardSizes, result.shardSizes);
+        assertEquals(clusterInfo.routingToDataPath, result.routingToDataPath);
+    }
+
+    private static ImmutableOpenMap<String, DiskUsage> randomDiskUsage() {
+        int numEntries = randomIntBetween(0, 128);
+        ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder(numEntries);
+        for (int i = 0; i < numEntries; i++) {
+            String key = randomAsciiOfLength(32);
+            DiskUsage diskUsage = new DiskUsage(
+                    randomAsciiOfLength(4), randomAsciiOfLength(4), randomAsciiOfLength(4),
+                    randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE)
+            );
+            builder.put(key, diskUsage);
+        }
+        return builder.build();
+    }
+
+    private static ImmutableOpenMap<String, Long> randomShardSizes() {
+        int numEntries = randomIntBetween(0, 128);
+        ImmutableOpenMap.Builder<String, Long> builder = ImmutableOpenMap.builder(numEntries);
+        for (int i = 0; i < numEntries; i++) {
+            String key = randomAsciiOfLength(32);
+            long shardSize = randomIntBetween(0, Integer.MAX_VALUE);
+            builder.put(key, shardSize);
+        }
+        return builder.build();
+    }
+
+    private static ImmutableOpenMap<ShardRouting, String> randomRoutingToDataPath() {
+        int numEntries = randomIntBetween(0, 128);
+        ImmutableOpenMap.Builder<ShardRouting, String> builder = ImmutableOpenMap.builder(numEntries);
+        for (int i = 0; i < numEntries; i++) {
+            RestoreSource restoreSource = new RestoreSource(new Snapshot(randomAsciiOfLength(4),
+                    new SnapshotId(randomAsciiOfLength(4), randomAsciiOfLength(4))), Version.CURRENT, randomAsciiOfLength(4));
+            UnassignedInfo.Reason reason = randomFrom(UnassignedInfo.Reason.values());
+            UnassignedInfo unassignedInfo = new UnassignedInfo(reason, randomAsciiOfLength(4));
+            ShardId shardId = new ShardId(randomAsciiOfLength(32), randomAsciiOfLength(32), randomIntBetween(0, Integer.MAX_VALUE));
+            ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, restoreSource, randomBoolean(), unassignedInfo);
+            builder.put(shardRouting, randomAsciiOfLength(32));
+        }
+        return builder.build();
+    }
+
+}