|
@@ -23,12 +23,14 @@ import com.carrotsearch.hppc.ObjectIntOpenHashMap;
|
|
|
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|
|
import com.google.common.base.Predicate;
|
|
|
import com.google.common.collect.ImmutableSet;
|
|
|
+import com.google.common.collect.Iterators;
|
|
|
import com.google.common.collect.Sets;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
|
import org.elasticsearch.cluster.metadata.MetaData;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
+import org.elasticsearch.common.collect.IdentityHashSet;
|
|
|
import org.elasticsearch.index.shard.ShardId;
|
|
|
|
|
|
import java.util.*;
|
|
@@ -51,20 +53,18 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
|
|
|
private final Map<String, RoutingNode> nodesToShards = newHashMap();
|
|
|
|
|
|
- private final List<MutableShardRouting> unassigned = newArrayList();
|
|
|
+ private final UnassignedShards unassignedShards = new UnassignedShards();
|
|
|
|
|
|
- private final List<MutableShardRouting> ignoredUnassigned = newArrayList();
|
|
|
+ private final List<MutableShardRouting> ignoredUnassignedShards = newArrayList();
|
|
|
|
|
|
- private final Map<ShardId, List<MutableShardRouting>> replicaSets = newHashMap();
|
|
|
-
|
|
|
- private int unassignedPrimaryCount = 0;
|
|
|
+ private final Map<ShardId, Set<MutableShardRouting>> assignedShards = newHashMap();
|
|
|
|
|
|
private int inactivePrimaryCount = 0;
|
|
|
|
|
|
private int inactiveShardCount = 0;
|
|
|
|
|
|
- Set<ShardId> relocatingReplicaSets = new HashSet<ShardId>();
|
|
|
-
|
|
|
+ private int relocatingShards = 0;
|
|
|
+
|
|
|
private Set<ShardId> clearPostAllocationFlag;
|
|
|
|
|
|
private final Map<String, ObjectIntOpenHashMap<String>> nodesPerAttributeNames = new HashMap<String, ObjectIntOpenHashMap<String>>();
|
|
@@ -97,10 +97,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
}
|
|
|
MutableShardRouting sr = new MutableShardRouting(shard);
|
|
|
entries.add(sr);
|
|
|
- addToReplicaSet(sr);
|
|
|
+ activeShardsAdd(sr);
|
|
|
if (shard.relocating()) {
|
|
|
entries = nodesToShards.get(shard.relocatingNodeId());
|
|
|
- relocatingReplicaSets.add(shard.shardId());
|
|
|
+ relocatingShards++;
|
|
|
if (entries == null) {
|
|
|
entries = newArrayList();
|
|
|
nodesToShards.put(shard.relocatingNodeId(), entries);
|
|
@@ -110,7 +110,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
sr = new MutableShardRouting(shard.index(), shard.id(), shard.relocatingNodeId(),
|
|
|
shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version());
|
|
|
entries.add(sr);
|
|
|
- addToReplicaSet(sr);
|
|
|
+ activeShardsAdd(sr);
|
|
|
} else if (!shard.active()) { // shards that are initializing without being relocated
|
|
|
if (shard.primary()) {
|
|
|
inactivePrimaryCount++;
|
|
@@ -119,12 +119,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
}
|
|
|
} else {
|
|
|
MutableShardRouting sr = new MutableShardRouting(shard);
|
|
|
- addToReplicaSet(sr);
|
|
|
- unassigned.add(sr);
|
|
|
- if (shard.primary()) {
|
|
|
- unassignedPrimaryCount++;
|
|
|
- }
|
|
|
-
|
|
|
+ activeShardsAdd(sr);
|
|
|
+ unassignedShards.add(sr);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -137,7 +133,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
|
|
|
@Override
|
|
|
public Iterator<RoutingNode> iterator() {
|
|
|
- return nodesToShards.values().iterator();
|
|
|
+ return Iterators.unmodifiableIterator(nodesToShards.values().iterator());
|
|
|
}
|
|
|
|
|
|
public RoutingTable routingTable() {
|
|
@@ -177,27 +173,19 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
}
|
|
|
|
|
|
public boolean hasUnassigned() {
|
|
|
- return !unassigned.isEmpty();
|
|
|
+ return !unassignedShards.isEmpty();
|
|
|
}
|
|
|
|
|
|
public List<MutableShardRouting> ignoredUnassigned() {
|
|
|
- return this.ignoredUnassigned;
|
|
|
+ return this.ignoredUnassignedShards;
|
|
|
}
|
|
|
|
|
|
- public List<MutableShardRouting> unassigned() {
|
|
|
- return this.unassigned;
|
|
|
+ public UnassignedShards unassigned() {
|
|
|
+ return this.unassignedShards;
|
|
|
}
|
|
|
|
|
|
- public List<MutableShardRouting> getUnassigned() {
|
|
|
- return unassigned();
|
|
|
- }
|
|
|
-
|
|
|
- public Map<String, RoutingNode> nodesToShards() {
|
|
|
- return nodesToShards;
|
|
|
- }
|
|
|
-
|
|
|
- public Map<String, RoutingNode> getNodesToShards() {
|
|
|
- return nodesToShards();
|
|
|
+ public RoutingNodesIterator nodes() {
|
|
|
+ return new RoutingNodesIterator(nodesToShards.values().iterator());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -238,11 +226,11 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
}
|
|
|
|
|
|
public boolean hasUnassignedPrimaries() {
|
|
|
- return unassignedPrimaryCount > 0;
|
|
|
+ return unassignedShards.numPrimaries() > 0;
|
|
|
}
|
|
|
|
|
|
public boolean hasUnassignedShards() {
|
|
|
- return !unassigned.isEmpty();
|
|
|
+ return !unassignedShards.isEmpty();
|
|
|
}
|
|
|
|
|
|
public boolean hasInactivePrimaries() {
|
|
@@ -254,48 +242,46 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
}
|
|
|
|
|
|
public int getRelocatingShardCount() {
|
|
|
- return relocatingReplicaSets.size();
|
|
|
+ return relocatingShards;
|
|
|
}
|
|
|
|
|
|
- public MutableShardRouting findPrimaryForReplica(ShardRouting shard) {
|
|
|
+ /**
|
|
|
+ * Returns the active primary shard for the given ShardRouting or <code>null</code> if
|
|
|
+ * no primary is found or the primary is not active.
|
|
|
+ */
|
|
|
+ public MutableShardRouting activePrimary(ShardRouting shard) {
|
|
|
assert !shard.primary();
|
|
|
- MutableShardRouting primary = null;
|
|
|
- for (MutableShardRouting shardRouting : shardsRoutingFor(shard)) {
|
|
|
+ for (MutableShardRouting shardRouting : activeShards(shard.shardId())) {
|
|
|
if (shardRouting.primary()) {
|
|
|
- primary = shardRouting;
|
|
|
+ if (shardRouting.active()) {
|
|
|
+ return shardRouting;
|
|
|
+ }
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
- assert primary != null;
|
|
|
- return primary;
|
|
|
- }
|
|
|
-
|
|
|
- public List<MutableShardRouting> shardsRoutingFor(ShardRouting shardRouting) {
|
|
|
- return shardsRoutingFor(shardRouting.index(), shardRouting.id());
|
|
|
- }
|
|
|
-
|
|
|
- public List<MutableShardRouting> shardsRoutingFor(String index, int shardId) {
|
|
|
- ShardId sid = new ShardId(index, shardId);
|
|
|
- List<MutableShardRouting> shards = replicaSetFor(sid);
|
|
|
- assert shards != null;
|
|
|
- // no need to check unassigned array, since the ShardRoutings are in the replica set.
|
|
|
- return shards;
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
- public int numberOfShardsOfType(ShardRoutingState state) {
|
|
|
- int count = 0;
|
|
|
- for (RoutingNode routingNode : this) {
|
|
|
- count += routingNode.numberOfShardsWithState(state);
|
|
|
+ /**
|
|
|
+ * Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code>
|
|
|
+ */
|
|
|
+ public boolean allReplicasActive(ShardRouting shardRouting) {
|
|
|
+ final Set<MutableShardRouting> shards = activeShards(shardRouting.shardId());
|
|
|
+ if (shards.isEmpty() || shards.size() < this.routingTable.index(shardRouting.index()).shard(shardRouting.id()).size()) {
|
|
|
+ return false; // if we are empty nothing is active if we have less than total at least one is unassigned
|
|
|
+ }
|
|
|
+ for (MutableShardRouting shard : shards) {
|
|
|
+ if (!shard.active()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
- return count;
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
public List<MutableShardRouting> shards(Predicate<MutableShardRouting> predicate) {
|
|
|
List<MutableShardRouting> shards = newArrayList();
|
|
|
for (RoutingNode routingNode : this) {
|
|
|
- List<MutableShardRouting> nodeShards = routingNode.shards();
|
|
|
- for (int i = 0; i < nodeShards.size(); i++) {
|
|
|
- MutableShardRouting shardRouting = nodeShards.get(i);
|
|
|
+ for (MutableShardRouting shardRouting : routingNode) {
|
|
|
if (predicate.apply(shardRouting)) {
|
|
|
shards.add(shardRouting);
|
|
|
}
|
|
@@ -305,6 +291,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
}
|
|
|
|
|
|
public List<MutableShardRouting> shardsWithState(ShardRoutingState... state) {
|
|
|
+ // TODO these are used on tests only - move into utils class
|
|
|
List<MutableShardRouting> shards = newArrayList();
|
|
|
for (RoutingNode routingNode : this) {
|
|
|
shards.addAll(routingNode.shardsWithState(state));
|
|
@@ -313,6 +300,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
}
|
|
|
|
|
|
public List<MutableShardRouting> shardsWithState(String index, ShardRoutingState... state) {
|
|
|
+ // TODO these are used on tests only - move into utils class
|
|
|
List<MutableShardRouting> shards = newArrayList();
|
|
|
for (RoutingNode routingNode : this) {
|
|
|
shards.addAll(routingNode.shardsWithState(index, state));
|
|
@@ -326,63 +314,12 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
sb.append(routingNode.prettyPrint());
|
|
|
}
|
|
|
sb.append("---- unassigned\n");
|
|
|
- for (MutableShardRouting shardEntry : unassigned) {
|
|
|
+ for (MutableShardRouting shardEntry : unassignedShards) {
|
|
|
sb.append("--------").append(shardEntry.shortSummary()).append('\n');
|
|
|
}
|
|
|
return sb.toString();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * calculates RoutingNodes statistics by iterating over all {@link MutableShardRouting}s
|
|
|
- * in the cluster to ensure the {@link RoutingManager} book-keeping is correct.
|
|
|
- * For performance reasons, this should only be called from test cases.
|
|
|
- *
|
|
|
- * @return true if all counts are the same, false if either of the book-keeping numbers is off.
|
|
|
- */
|
|
|
- public boolean assertShardStats() {
|
|
|
- int unassignedPrimaryCount = 0;
|
|
|
- int inactivePrimaryCount = 0;
|
|
|
- int inactiveShardCount = 0;
|
|
|
- int totalShards = 0;
|
|
|
-
|
|
|
- Set<ShardId> seenShards = newHashSet();
|
|
|
-
|
|
|
- for (RoutingNode node : this) {
|
|
|
- for (MutableShardRouting shard : node) {
|
|
|
- if (!shard.active()) {
|
|
|
- if (!shard.relocating()) {
|
|
|
- inactiveShardCount++;
|
|
|
- if (shard.primary()){
|
|
|
- inactivePrimaryCount++;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- totalShards++;
|
|
|
- seenShards.add(shard.shardId());
|
|
|
- }
|
|
|
- }
|
|
|
- for (MutableShardRouting shard : unassigned) {
|
|
|
- if (shard.primary()) {
|
|
|
- unassignedPrimaryCount++;
|
|
|
- }
|
|
|
- totalShards++;
|
|
|
- seenShards.add(shard.shardId());
|
|
|
- }
|
|
|
-
|
|
|
- for (ShardId shardId : seenShards) {
|
|
|
- assert replicaSetFor(shardId) != null;
|
|
|
- }
|
|
|
-
|
|
|
- assert unassignedPrimaryCount == 0 || hasUnassignedPrimaries();
|
|
|
- assert inactivePrimaryCount == 0 || hasInactivePrimaries();
|
|
|
- assert inactiveShardCount == 0 || hasInactiveShards();
|
|
|
- assert hasUnassignedPrimaries() || unassignedPrimaryCount == 0;
|
|
|
- assert hasInactivePrimaries() || inactivePrimaryCount == 0;
|
|
|
- assert hasInactiveShards() || inactiveShardCount == 0;
|
|
|
-
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Assign a shard to a node. This will increment the inactiveShardCount counter
|
|
|
* and the inactivePrimaryCount counter if the shard is the primary.
|
|
@@ -400,158 +337,404 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
* @param shard the shard to be assigned
|
|
|
* @param nodeId the nodeId this shard should initialize on or relocate from
|
|
|
*/
|
|
|
- public void assignShardToNode(MutableShardRouting shard, String nodeId) {
|
|
|
-
|
|
|
+ public void assign(MutableShardRouting shard, String nodeId) {
|
|
|
// state will not change if the shard is already initializing.
|
|
|
ShardRoutingState oldState = shard.state();
|
|
|
-
|
|
|
shard.assignToNode(nodeId);
|
|
|
node(nodeId).add(shard);
|
|
|
-
|
|
|
if (oldState == ShardRoutingState.UNASSIGNED) {
|
|
|
inactiveShardCount++;
|
|
|
if (shard.primary()) {
|
|
|
- unassignedPrimaryCount--;
|
|
|
inactivePrimaryCount++;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
if (shard.state() == ShardRoutingState.RELOCATING) {
|
|
|
- // this a HashSet. double add no worry.
|
|
|
- relocatingReplicaSets.add(shard.shardId());
|
|
|
+ relocatingShards++;
|
|
|
}
|
|
|
- // possibly double/triple adding it to a replica set doesn't matter
|
|
|
- // but make sure we know about the shard.
|
|
|
- addToReplicaSet(shard);
|
|
|
+ activeShardsAdd(shard);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Relocate a shard to another node.
|
|
|
- *
|
|
|
- * STARTED => RELOCATING
|
|
|
- *
|
|
|
- * @param shard the shard to relocate
|
|
|
- * @param nodeId the node to relocate to
|
|
|
*/
|
|
|
- public void relocateShard(MutableShardRouting shard, String nodeId) {
|
|
|
- relocatingReplicaSets.add(shard.shardId());
|
|
|
+ public void relocate(MutableShardRouting shard, String nodeId) {
|
|
|
+ relocatingShards++;
|
|
|
shard.relocate(nodeId);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Cancels the relocation of a shard.
|
|
|
- *
|
|
|
- * RELOCATING => STARTED
|
|
|
- *
|
|
|
- * @param shard the shard that was relocating previously and now should be started again.
|
|
|
+ * Mark a shard as started and adjusts internal statistics.
|
|
|
+ */
|
|
|
+ public void started(MutableShardRouting shard) {
|
|
|
+ if (!shard.active() && shard.relocatingNodeId() == null) {
|
|
|
+ inactiveShardCount--;
|
|
|
+ if (shard.primary()) {
|
|
|
+ inactivePrimaryCount--;
|
|
|
+ }
|
|
|
+ } else if (shard.relocating()) {
|
|
|
+ relocatingShards--;
|
|
|
+ }
|
|
|
+ assert !shard.started();
|
|
|
+ shard.moveToStarted();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Cancels a relocation of a shard that shard must relocating.
|
|
|
*/
|
|
|
- public void cancelRelocationForShard(MutableShardRouting shard) {
|
|
|
- relocatingReplicaSets.remove(shard.shardId());
|
|
|
+ public void cancelRelocation(MutableShardRouting shard) {
|
|
|
+ relocatingShards--;
|
|
|
shard.cancelRelocation();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Unassigns shard from a node.
|
|
|
- * Both relocating and started shards that are deallocated need a new
|
|
|
- * primary elected.
|
|
|
- *
|
|
|
- * RELOCATING => null
|
|
|
- * STARTED => null
|
|
|
- * INITIALIZING => null
|
|
|
+ * swaps the status of a shard, making replicas primary and vice versa.
|
|
|
*
|
|
|
- * @param shard the shard to be unassigned.
|
|
|
+ * @param shards the shard to have its primary status swapped.
|
|
|
*/
|
|
|
- public void deassignShard(MutableShardRouting shard) {
|
|
|
- if (shard.state() == ShardRoutingState.RELOCATING) {
|
|
|
- cancelRelocationForShard(shard);
|
|
|
+ public void swapPrimaryFlag(MutableShardRouting... shards) {
|
|
|
+ for (MutableShardRouting shard : shards) {
|
|
|
+ if (shard.primary()) {
|
|
|
+ shard.moveFromPrimary();
|
|
|
+ if (shard.unassigned()) {
|
|
|
+ unassignedShards.primaries--;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ shard.moveToPrimary();
|
|
|
+ if (shard.unassigned()) {
|
|
|
+ unassignedShards.primaries++;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- if (shard.primary())
|
|
|
- unassignedPrimaryCount++;
|
|
|
- shard.deassignNode();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final Set<MutableShardRouting> EMPTY = Collections.emptySet();
|
|
|
+
|
|
|
+ private Set<MutableShardRouting> activeShards(ShardId shardId) {
|
|
|
+ final Set<MutableShardRouting> replicaSet = assignedShards.get(shardId);
|
|
|
+ return replicaSet == null ? EMPTY : Collections.unmodifiableSet(replicaSet);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Mark a shard as started.
|
|
|
- * Decreases the counters and marks a replication complete or failed,
|
|
|
- * which is the same for accounting in this class.
|
|
|
- *
|
|
|
- * INITIALIZING => STARTED
|
|
|
- * RELOCATIng => STARTED
|
|
|
- *
|
|
|
- * @param shard the shard to be marked as started
|
|
|
+ * Cancels the give shard from the Routing nodes internal statistics and cancels
|
|
|
+ * the relocation if the shard is relocating.
|
|
|
+ * @param shard
|
|
|
*/
|
|
|
- public void markShardStarted(MutableShardRouting shard) {
|
|
|
- if (!relocatingReplicaSets.contains(shard.shardId()) && shard.state() == ShardRoutingState.INITIALIZING) {
|
|
|
+ private void remove(MutableShardRouting shard) {
|
|
|
+ if (!shard.active() && shard.relocatingNodeId() == null) {
|
|
|
inactiveShardCount--;
|
|
|
+ assert inactiveShardCount >= 0;
|
|
|
if (shard.primary()) {
|
|
|
inactivePrimaryCount--;
|
|
|
}
|
|
|
+ } else if (shard.relocating()) {
|
|
|
+ cancelRelocation(shard);
|
|
|
}
|
|
|
- if (shard.state() == ShardRoutingState.INITIALIZING
|
|
|
- && shard.relocatingNodeId() != null) {
|
|
|
- relocatingReplicaSets.remove(shard.shardId());
|
|
|
+ activeShardsRemove(shard);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void activeShardsAdd(MutableShardRouting shard) {
|
|
|
+ if (shard.unassigned()) {
|
|
|
+ // no unassigned
|
|
|
+ return;
|
|
|
}
|
|
|
- shard.moveToStarted();
|
|
|
+ Set<MutableShardRouting> replicaSet = assignedShards.get(shard.shardId());
|
|
|
+ if (replicaSet == null) {
|
|
|
+ replicaSet = new IdentityHashSet<MutableShardRouting>();
|
|
|
+ assignedShards.put(shard.shardId(), replicaSet);
|
|
|
+ }
|
|
|
+ replicaSet.add(shard);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Return a list of shards belonging to a replica set
|
|
|
- *
|
|
|
- * @param shard the shard for which to retrieve the replica set
|
|
|
- * @return an unmodifiable List of the replica set
|
|
|
- */
|
|
|
- public List<MutableShardRouting> replicaSetFor(MutableShardRouting shard) {
|
|
|
- return replicaSetFor(shard.shardId());
|
|
|
+ private void activeShardsRemove(MutableShardRouting shard) {
|
|
|
+ Set<MutableShardRouting> replicaSet = assignedShards.get(shard.shardId());
|
|
|
+ if (replicaSet != null) {
|
|
|
+ if (replicaSet.contains(shard)) {
|
|
|
+ replicaSet.remove(shard);
|
|
|
+ } else {
|
|
|
+ assert false : "Illegal state";
|
|
|
+ Iterator<MutableShardRouting> iterator = replicaSet.iterator();
|
|
|
+ while(iterator.hasNext()) {
|
|
|
+ if (shard.equals(iterator.next())) {
|
|
|
+ iterator.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Return a list of shards belonging to a replica set
|
|
|
- *
|
|
|
- * @param shardId the {@link ShardId} for which to retrieve the replica set
|
|
|
- * @return an unmodifiable List of the replica set
|
|
|
- */
|
|
|
- public List<MutableShardRouting> replicaSetFor(ShardId shardId) {
|
|
|
- List<MutableShardRouting> replicaSet = replicaSets.get(shardId);
|
|
|
- assert replicaSet != null;
|
|
|
- return Collections.unmodifiableList(replicaSet);
|
|
|
+ public boolean isKnown(DiscoveryNode node) {
|
|
|
+ return nodesToShards.containsKey(node.getId());
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Let this class know about a shard, which it then sorts into
|
|
|
- * its replica set. Package private as only {@link RoutingNodes}
|
|
|
- * should notify this class of shards during initialization.
|
|
|
- *
|
|
|
- * @param shard the shard to be sorted into its replica set
|
|
|
- */
|
|
|
- private void addToReplicaSet(MutableShardRouting shard) {
|
|
|
- List<MutableShardRouting> replicaSet = replicaSets.get(shard.shardId());
|
|
|
- if (replicaSet == null) {
|
|
|
- replicaSet = new ArrayList<MutableShardRouting>();
|
|
|
- replicaSets.put(shard.shardId(), replicaSet);
|
|
|
+ public void addNode(DiscoveryNode node) {
|
|
|
+ RoutingNode routingNode = new RoutingNode(node.id(), node);
|
|
|
+ nodesToShards.put(routingNode.nodeId(), routingNode);
|
|
|
+ }
|
|
|
+
|
|
|
+ public RoutingNodeIterator routingNodeIter(String nodeId) {
|
|
|
+ final RoutingNode routingNode = nodesToShards.get(nodeId);
|
|
|
+ if (routingNode == null) {
|
|
|
+ return null;
|
|
|
}
|
|
|
- replicaSet.add(shard);
|
|
|
+ assert assertShardStats(this);
|
|
|
+ return new RoutingNodeIterator(routingNode);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * marks a replica set as relocating.
|
|
|
- *
|
|
|
- * @param shard a member of the relocating replica set
|
|
|
- */
|
|
|
- private void markRelocating(MutableShardRouting shard) {
|
|
|
- relocatingReplicaSets.add(shard.shardId());
|
|
|
+ public RoutingNode[] toArray() {
|
|
|
+ return nodesToShards.values().toArray(new RoutingNode[nodesToShards.size()]);
|
|
|
+ }
|
|
|
+
|
|
|
+ public final static class UnassignedShards implements Iterable<MutableShardRouting> {
|
|
|
+
|
|
|
+ private final List<MutableShardRouting> unassigned;
|
|
|
+ private int primaries = 0;
|
|
|
+ private long transactionId = 0;
|
|
|
+ private final UnassignedShards source;
|
|
|
+ private final long sourceTransactionId;
|
|
|
+
|
|
|
+ public UnassignedShards(UnassignedShards other) {
|
|
|
+ source = other;
|
|
|
+ sourceTransactionId = other.transactionId;
|
|
|
+ unassigned = new ArrayList<MutableShardRouting>(other.unassigned);
|
|
|
+ primaries = other.primaries;
|
|
|
+ }
|
|
|
+
|
|
|
+ public UnassignedShards() {
|
|
|
+ unassigned = new ArrayList<MutableShardRouting>();
|
|
|
+ source = null;
|
|
|
+ sourceTransactionId = -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void add(MutableShardRouting mutableShardRouting) {
|
|
|
+ if(mutableShardRouting.primary()) {
|
|
|
+ primaries++;
|
|
|
+ }
|
|
|
+ unassigned.add(mutableShardRouting);
|
|
|
+ transactionId++;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void addAll(Collection<MutableShardRouting> mutableShardRoutings) {
|
|
|
+ for (MutableShardRouting r : mutableShardRoutings) {
|
|
|
+ add(r);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public int size() {
|
|
|
+ return unassigned.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ public int numPrimaries() {
|
|
|
+ return primaries;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Iterator<MutableShardRouting> iterator() {
|
|
|
+ final Iterator<MutableShardRouting> iterator = unassigned.iterator();
|
|
|
+ return new Iterator<MutableShardRouting>() {
|
|
|
+ private MutableShardRouting current;
|
|
|
+ @Override
|
|
|
+ public boolean hasNext() {
|
|
|
+ return iterator.hasNext();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public MutableShardRouting next() {
|
|
|
+ return current = iterator.next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void remove() {
|
|
|
+ iterator.remove();
|
|
|
+ if (current.primary()) {
|
|
|
+ primaries--;
|
|
|
+ }
|
|
|
+ transactionId++;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isEmpty() {
|
|
|
+ return unassigned.isEmpty();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void shuffle() {
|
|
|
+ Collections.shuffle(unassigned);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void clear() {
|
|
|
+ transactionId++;
|
|
|
+ unassigned.clear();
|
|
|
+ primaries = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void transactionEnd(UnassignedShards shards) {
|
|
|
+ assert shards.source == this && shards.sourceTransactionId == transactionId :
|
|
|
+ "Expected ID: " + shards.sourceTransactionId + " actual: " + transactionId + " Expected Source: " + shards.source + " actual: " + this;
|
|
|
+ transactionId++;
|
|
|
+ this.unassigned.clear();
|
|
|
+ this.unassigned.addAll(shards.unassigned);
|
|
|
+ this.primaries = shards.primaries;
|
|
|
+ }
|
|
|
+
|
|
|
+ public UnassignedShards transactionBegin() {
|
|
|
+ return new UnassignedShards(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void copyAll(Collection<MutableShardRouting> others) {
|
|
|
+ others.addAll(unassigned);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
- * swaps the status of a shard, making replicas primary and vice versa.
|
|
|
- *
|
|
|
- * @param shard the shard to have its primary status swapped.
|
|
|
+ * Calculates RoutingNodes statistics by iterating over all {@link MutableShardRouting}s
|
|
|
+ * in the cluster to ensure the book-keeping is correct.
|
|
|
+ * For performance reasons, this should only be called from asserts
|
|
|
+ *
|
|
|
+ * @return this method always returns <code>true</code> or throws an assertion error. If assertion are not enabled
|
|
|
+ * this method does nothing.
|
|
|
*/
|
|
|
- public void changePrimaryStatusForShard(MutableShardRouting... shards) {
|
|
|
- for (MutableShardRouting shard : shards) {
|
|
|
+ public static boolean assertShardStats(RoutingNodes routingNodes) {
|
|
|
+ boolean run = false;
|
|
|
+ assert (run = true); // only run if assertions are enabled!
|
|
|
+ if (!run) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ int unassignedPrimaryCount = 0;
|
|
|
+ int inactivePrimaryCount = 0;
|
|
|
+ int inactiveShardCount = 0;
|
|
|
+ int relocating = 0;
|
|
|
+ final Set<ShardId> seenShards = newHashSet();
|
|
|
+ Map<String, Integer> indicesAndShards = new HashMap<String, Integer>();
|
|
|
+ for (RoutingNode node : routingNodes) {
|
|
|
+ for (MutableShardRouting shard : node) {
|
|
|
+ if (!shard.active() && shard.relocatingNodeId() == null) {
|
|
|
+ if (!shard.relocating()) {
|
|
|
+ inactiveShardCount++;
|
|
|
+ if (shard.primary()) {
|
|
|
+ inactivePrimaryCount++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (shard.relocating()) {
|
|
|
+ relocating++;
|
|
|
+ }
|
|
|
+ seenShards.add(shard.shardId());
|
|
|
+ Integer i = indicesAndShards.get(shard.index());
|
|
|
+ if (i == null) {
|
|
|
+ i = shard.id();
|
|
|
+ }
|
|
|
+ indicesAndShards.put(shard.index(), Math.max(i, shard.id()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Assert that the active shard routing are identical.
|
|
|
+ Set<Map.Entry<String, Integer>> entries = indicesAndShards.entrySet();
|
|
|
+ Set<MutableShardRouting> shards = newHashSet();
|
|
|
+ for (Map.Entry<String, Integer> e : entries) {
|
|
|
+ String index = e.getKey();
|
|
|
+ for (int i = 0; i < e.getValue(); i++) {
|
|
|
+ for (RoutingNode routingNode : routingNodes) {
|
|
|
+ for (MutableShardRouting shardRouting : routingNode) {
|
|
|
+ if (shardRouting.index().equals(index) && shardRouting.id() == i) {
|
|
|
+ shards.add(shardRouting);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Set<MutableShardRouting> mutableShardRoutings = routingNodes.activeShards(new ShardId(index, i));
|
|
|
+ for (MutableShardRouting r : mutableShardRoutings) {
|
|
|
+ assert shards.contains(r);
|
|
|
+ shards.remove(r);
|
|
|
+ }
|
|
|
+ assert shards.isEmpty();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for (MutableShardRouting shard : routingNodes.unassigned()) {
|
|
|
if (shard.primary()) {
|
|
|
- shard.moveFromPrimary();
|
|
|
- } else {
|
|
|
- shard.moveToPrimary();
|
|
|
+ unassignedPrimaryCount++;
|
|
|
}
|
|
|
+ seenShards.add(shard.shardId());
|
|
|
+ }
|
|
|
+
|
|
|
+ assert unassignedPrimaryCount == routingNodes.unassignedShards.numPrimaries() :
|
|
|
+ "Unassigned primaries is [" + unassignedPrimaryCount + "] but RoutingNodes returned unassigned primaries [" + routingNodes.unassigned().numPrimaries() + "]";
|
|
|
+ assert inactivePrimaryCount == routingNodes.inactivePrimaryCount :
|
|
|
+ "Inactive Primary count [" + inactivePrimaryCount + "] but RoutingNodes returned inactive primaries [" + routingNodes.inactivePrimaryCount + "]";
|
|
|
+ assert inactiveShardCount == routingNodes.inactiveShardCount :
|
|
|
+ "Inactive Shard count [" + inactiveShardCount + "] but RoutingNodes returned inactive shards [" + routingNodes.inactiveShardCount + "]";
|
|
|
+ assert routingNodes.getRelocatingShardCount() == relocating : "Relocating shards mismatch [" + routingNodes.getRelocatingShardCount() + "] but expected [" + relocating + "]";
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public class RoutingNodesIterator implements Iterator<RoutingNode>, Iterable<MutableShardRouting> {
|
|
|
+ private RoutingNode current;
|
|
|
+ private final Iterator<RoutingNode> delegate;
|
|
|
+
|
|
|
+ public RoutingNodesIterator(Iterator<RoutingNode> iterator) {
|
|
|
+ delegate = iterator;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean hasNext() {
|
|
|
+ return delegate.hasNext();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public RoutingNode next() {
|
|
|
+ return current = delegate.next();
|
|
|
+ }
|
|
|
+
|
|
|
+ public RoutingNodeIterator nodeShards() {
|
|
|
+ return new RoutingNodeIterator(current);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void remove() {
|
|
|
+ delegate.remove();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Iterator<MutableShardRouting> iterator() {
|
|
|
+ return nodeShards();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public final class RoutingNodeIterator implements Iterator<MutableShardRouting>, Iterable<MutableShardRouting> {
|
|
|
+ private final RoutingNode iterable;
|
|
|
+ private MutableShardRouting shard;
|
|
|
+ private final Iterator<MutableShardRouting> delegate;
|
|
|
+
|
|
|
+ public RoutingNodeIterator(RoutingNode iterable) {
|
|
|
+ this.delegate = iterable.mutableIterator();
|
|
|
+ this.iterable = iterable;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean hasNext() {
|
|
|
+ return delegate.hasNext();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public MutableShardRouting next() {
|
|
|
+ return shard = delegate.next();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void remove() {
|
|
|
+ delegate.remove();
|
|
|
+ RoutingNodes.this.remove(shard);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Iterator<MutableShardRouting> iterator() {
|
|
|
+ return iterable.iterator();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void moveToUnassigned() {
|
|
|
+ iterator().remove();
|
|
|
+ unassigned().add(new MutableShardRouting(shard.index(), shard.id(),
|
|
|
+ null, shard.primary(), ShardRoutingState.UNASSIGNED, shard.version() + 1));
|
|
|
}
|
|
|
}
|
|
|
}
|