|
|
@@ -21,15 +21,20 @@ package org.elasticsearch.cluster.routing;
|
|
|
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
|
+import org.elasticsearch.index.Index;
|
|
|
import org.elasticsearch.index.shard.ShardId;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.LinkedHashSet;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.function.Predicate;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
@@ -48,6 +53,8 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
|
|
|
|
|
private final LinkedHashSet<ShardRouting> relocatingShards;
|
|
|
|
|
|
+ private final HashMap<Index, LinkedHashSet<ShardRouting>> shardsByIndex;
|
|
|
+
|
|
|
public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
|
|
|
this(nodeId, node, buildShardRoutingMap(shards));
|
|
|
}
|
|
|
@@ -58,12 +65,14 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
|
|
this.shards = shards;
|
|
|
this.relocatingShards = new LinkedHashSet<>();
|
|
|
this.initializingShards = new LinkedHashSet<>();
|
|
|
+ this.shardsByIndex = new LinkedHashMap<>();
|
|
|
for (ShardRouting shardRouting : shards.values()) {
|
|
|
if (shardRouting.initializing()) {
|
|
|
initializingShards.add(shardRouting);
|
|
|
} else if (shardRouting.relocating()) {
|
|
|
relocatingShards.add(shardRouting);
|
|
|
}
|
|
|
+ shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting);
|
|
|
}
|
|
|
assert invariant();
|
|
|
}
|
|
|
@@ -128,6 +137,7 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
|
|
} else if (shard.relocating()) {
|
|
|
relocatingShards.add(shard);
|
|
|
}
|
|
|
+ shardsByIndex.computeIfAbsent(shard.index(), k -> new LinkedHashSet<>()).add(shard);
|
|
|
assert invariant();
|
|
|
}
|
|
|
|
|
|
@@ -148,11 +158,16 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
|
|
boolean exist = relocatingShards.remove(oldShard);
|
|
|
assert exist : "expected shard " + oldShard + " to exist in relocatingShards";
|
|
|
}
|
|
|
+ shardsByIndex.get(oldShard.index()).remove(oldShard);
|
|
|
+ if (shardsByIndex.get(oldShard.index()).isEmpty()) {
|
|
|
+ shardsByIndex.remove(oldShard.index());
|
|
|
+ }
|
|
|
if (newShard.initializing()) {
|
|
|
initializingShards.add(newShard);
|
|
|
} else if (newShard.relocating()) {
|
|
|
relocatingShards.add(newShard);
|
|
|
}
|
|
|
+ shardsByIndex.computeIfAbsent(newShard.index(), k -> new LinkedHashSet<>()).add(newShard);
|
|
|
assert invariant();
|
|
|
}
|
|
|
|
|
|
@@ -167,6 +182,10 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
|
|
boolean exist = relocatingShards.remove(shard);
|
|
|
assert exist : "expected shard " + shard + " to exist in relocatingShards";
|
|
|
}
|
|
|
+ shardsByIndex.get(shard.index()).remove(shard);
|
|
|
+ if (shardsByIndex.get(shard.index()).isEmpty()) {
|
|
|
+ shardsByIndex.remove(shard.index());
|
|
|
+ }
|
|
|
assert invariant();
|
|
|
}
|
|
|
|
|
|
@@ -269,6 +288,15 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
|
|
return shards.size() - relocatingShards.size();
|
|
|
}
|
|
|
|
|
|
+ public int numberOfOwningShardsForIndex(final Index index) {
|
|
|
+ final LinkedHashSet<ShardRouting> shardRoutings = shardsByIndex.get(index);
|
|
|
+ if (shardRoutings == null) {
|
|
|
+ return 0;
|
|
|
+ } else {
|
|
|
+ return Math.toIntExact(shardRoutings.stream().filter(Predicate.not(ShardRouting::relocating)).count());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public String prettyPrint() {
|
|
|
StringBuilder sb = new StringBuilder();
|
|
|
sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n");
|
|
|
@@ -316,6 +344,10 @@ public class RoutingNode implements Iterable<ShardRouting> {
|
|
|
assert relocatingShards.size() == shardRoutingsRelocating.size();
|
|
|
assert relocatingShards.containsAll(shardRoutingsRelocating);
|
|
|
|
|
|
+ final Map<Index, Set<ShardRouting>> shardRoutingsByIndex =
|
|
|
+ shards.values().stream().collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet()));
|
|
|
+ assert shardRoutingsByIndex.equals(shardsByIndex);
|
|
|
+
|
|
|
return true;
|
|
|
}
|
|
|
}
|