|
|
@@ -8,6 +8,8 @@
|
|
|
|
|
|
package org.elasticsearch.cluster.routing;
|
|
|
|
|
|
+import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|
|
+
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.apache.lucene.util.CollectionUtil;
|
|
|
import org.elasticsearch.Assertions;
|
|
|
@@ -15,6 +17,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.Metadata;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
+import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
|
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
|
|
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
|
|
|
import org.elasticsearch.cluster.service.MasterService;
|
|
|
@@ -44,8 +47,9 @@ import java.util.stream.StreamSupport;
|
|
|
|
|
|
/**
|
|
|
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
|
|
|
- * It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing
|
|
|
- * or disallowing changes to its elements.
|
|
|
+ * It can be either initialized as mutable or immutable allowing or disallowing changes to its elements.
|
|
|
+ * (see {@link RoutingNodes#mutable(RoutingTable, DiscoveryNodes)}, {@link RoutingNodes#immutable(RoutingTable, DiscoveryNodes)},
|
|
|
+ * and {@link #mutableCopy()})
|
|
|
*
|
|
|
* The main methods used to update routing entries are:
|
|
|
* <ul>
|
|
|
@@ -57,11 +61,11 @@ import java.util.stream.StreamSupport;
|
|
|
*/
|
|
|
public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
|
|
|
- private final Map<String, RoutingNode> nodesToShards = new HashMap<>();
|
|
|
+ private final Map<String, RoutingNode> nodesToShards;
|
|
|
|
|
|
- private final UnassignedShards unassignedShards = new UnassignedShards(this);
|
|
|
+ private final UnassignedShards unassignedShards;
|
|
|
|
|
|
- private final Map<ShardId, List<ShardRouting>> assignedShards = new HashMap<>();
|
|
|
+ private final Map<ShardId, List<ShardRouting>> assignedShards;
|
|
|
|
|
|
private final boolean readOnly;
|
|
|
|
|
|
@@ -75,21 +79,33 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
|
|
|
private int totalShardCount = 0;
|
|
|
|
|
|
- private final Map<String, Set<String>> attributeValuesByAttribute = new HashMap<>();
|
|
|
- private final Map<String, Recoveries> recoveriesPerNode = new HashMap<>();
|
|
|
+ private final Map<String, Set<String>> attributeValuesByAttribute;
|
|
|
+ private final Map<String, Recoveries> recoveriesPerNode;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates an immutable instance from the {@link RoutingTable} and {@link DiscoveryNodes} found in a cluster state. Used to initialize
|
|
|
+ * the routing nodes in {@link ClusterState#getRoutingNodes()}. This method should not be used directly, use
|
|
|
+ * {@link ClusterState#getRoutingNodes()} instead.
|
|
|
+ */
|
|
|
+ public static RoutingNodes immutable(RoutingTable routingTable, DiscoveryNodes discoveryNodes) {
|
|
|
+ return new RoutingNodes(routingTable, discoveryNodes, true);
|
|
|
+ }
|
|
|
|
|
|
- public RoutingNodes(ClusterState clusterState) {
|
|
|
- this(clusterState, true);
|
|
|
+ public static RoutingNodes mutable(RoutingTable routingTable, DiscoveryNodes discoveryNodes) {
|
|
|
+ return new RoutingNodes(routingTable, discoveryNodes, false);
|
|
|
}
|
|
|
|
|
|
- public RoutingNodes(ClusterState clusterState, boolean readOnly) {
|
|
|
+ private RoutingNodes(RoutingTable routingTable, DiscoveryNodes discoveryNodes, boolean readOnly) {
|
|
|
this.readOnly = readOnly;
|
|
|
- final RoutingTable routingTable = clusterState.routingTable();
|
|
|
+ this.recoveriesPerNode = new HashMap<>();
|
|
|
+ this.assignedShards = new HashMap<>();
|
|
|
+ this.unassignedShards = new UnassignedShards(this);
|
|
|
+ this.attributeValuesByAttribute = new HashMap<>();
|
|
|
|
|
|
- Map<String, LinkedHashMap<ShardId, ShardRouting>> nodesToShards = new HashMap<>();
|
|
|
+ final Map<String, LinkedHashMap<ShardId, ShardRouting>> nodesToShards = new HashMap<>(discoveryNodes.getDataNodes().size());
|
|
|
// fill in the nodeToShards with the "live" nodes
|
|
|
- for (DiscoveryNode node : clusterState.nodes().getDataNodes().values()) {
|
|
|
- nodesToShards.put(node.getId(), new LinkedHashMap<>()); // LinkedHashMap to preserve order
|
|
|
+ for (ObjectCursor<String> node : discoveryNodes.getDataNodes().keys()) {
|
|
|
+ nodesToShards.put(node.value, new LinkedHashMap<>()); // LinkedHashMap to preserve order
|
|
|
}
|
|
|
|
|
|
// fill in the inverse of node -> shards allocated
|
|
|
@@ -104,11 +120,9 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
// by the ShardId, as this is common for primary and replicas.
|
|
|
// A replica Set might have one (and not more) replicas with the state of RELOCATING.
|
|
|
if (shard.assignedToNode()) {
|
|
|
- Map<ShardId, ShardRouting> entries = nodesToShards.computeIfAbsent(
|
|
|
- shard.currentNodeId(),
|
|
|
- k -> new LinkedHashMap<>()
|
|
|
- ); // LinkedHashMap to preserve order
|
|
|
- ShardRouting previousValue = entries.put(shard.shardId(), shard);
|
|
|
+ // LinkedHashMap to preserve order
|
|
|
+ ShardRouting previousValue = nodesToShards.computeIfAbsent(shard.currentNodeId(), k -> new LinkedHashMap<>())
|
|
|
+ .put(shard.shardId(), shard);
|
|
|
if (previousValue != null) {
|
|
|
throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node");
|
|
|
}
|
|
|
@@ -118,13 +132,12 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
}
|
|
|
if (shard.relocating()) {
|
|
|
relocatingShards++;
|
|
|
- // LinkedHashMap to preserve order.
|
|
|
- // Add the counterpart shard with relocatingNodeId reflecting the source from which
|
|
|
- // it's relocating from.
|
|
|
- entries = nodesToShards.computeIfAbsent(shard.relocatingNodeId(), k -> new LinkedHashMap<>());
|
|
|
ShardRouting targetShardRouting = shard.getTargetRelocatingShard();
|
|
|
addInitialRecovery(targetShardRouting, indexShard.primary);
|
|
|
- previousValue = entries.put(targetShardRouting.shardId(), targetShardRouting);
|
|
|
+ // LinkedHashMap to preserve order.
|
|
|
+ // Add the counterpart shard with relocatingNodeId reflecting the source from which it's relocating from.
|
|
|
+ previousValue = nodesToShards.computeIfAbsent(shard.relocatingNodeId(), k -> new LinkedHashMap<>())
|
|
|
+ .put(targetShardRouting.shardId(), targetShardRouting);
|
|
|
if (previousValue != null) {
|
|
|
throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node");
|
|
|
}
|
|
|
@@ -142,12 +155,50 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ this.nodesToShards = new HashMap<>(nodesToShards.size());
|
|
|
for (Map.Entry<String, LinkedHashMap<ShardId, ShardRouting>> entry : nodesToShards.entrySet()) {
|
|
|
String nodeId = entry.getKey();
|
|
|
- this.nodesToShards.put(nodeId, new RoutingNode(nodeId, clusterState.nodes().get(nodeId), entry.getValue()));
|
|
|
+ this.nodesToShards.put(nodeId, new RoutingNode(nodeId, discoveryNodes.get(nodeId), entry.getValue()));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private RoutingNodes(RoutingNodes routingNodes) {
|
|
|
+ // we should not call this on mutable instances, it's still expensive to create the copy and callers should instead mutate a single
|
|
|
+ // instance
|
|
|
+ assert routingNodes.readOnly : "tried to create a mutable copy from a mutable instance";
|
|
|
+ this.readOnly = false;
|
|
|
+ this.nodesToShards = new HashMap<>(routingNodes.nodesToShards.size());
|
|
|
+ for (Map.Entry<String, RoutingNode> entry : routingNodes.nodesToShards.entrySet()) {
|
|
|
+ this.nodesToShards.put(entry.getKey(), entry.getValue().copy());
|
|
|
+ }
|
|
|
+ this.assignedShards = new HashMap<>(routingNodes.assignedShards.size());
|
|
|
+ for (Map.Entry<ShardId, List<ShardRouting>> entry : routingNodes.assignedShards.entrySet()) {
|
|
|
+ this.assignedShards.put(entry.getKey(), new ArrayList<>(entry.getValue()));
|
|
|
+ }
|
|
|
+ this.unassignedShards = routingNodes.unassignedShards.copyFor(this);
|
|
|
+
|
|
|
+ this.inactivePrimaryCount = routingNodes.inactivePrimaryCount;
|
|
|
+ this.inactiveShardCount = routingNodes.inactiveShardCount;
|
|
|
+ this.relocatingShards = routingNodes.relocatingShards;
|
|
|
+ this.activeShardCount = routingNodes.activeShardCount;
|
|
|
+ this.totalShardCount = routingNodes.totalShardCount;
|
|
|
+ this.attributeValuesByAttribute = new HashMap<>(routingNodes.attributeValuesByAttribute.size());
|
|
|
+ for (Map.Entry<String, Set<String>> entry : routingNodes.attributeValuesByAttribute.entrySet()) {
|
|
|
+ this.attributeValuesByAttribute.put(entry.getKey(), new HashSet<>(entry.getValue()));
|
|
|
+ }
|
|
|
+ this.recoveriesPerNode = new HashMap<>(routingNodes.recoveriesPerNode.size());
|
|
|
+ for (Map.Entry<String, Recoveries> entry : routingNodes.recoveriesPerNode.entrySet()) {
|
|
|
+ this.recoveriesPerNode.put(entry.getKey(), entry.getValue().copy());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return a mutable copy of this instance
|
|
|
+ */
|
|
|
+ public RoutingNodes mutableCopy() {
|
|
|
+ return new RoutingNodes(this);
|
|
|
+ }
|
|
|
+
|
|
|
private void addRecovery(ShardRouting routing) {
|
|
|
updateRecoveryCounts(routing, true, findAssignedPrimaryIfPeerRecovery(routing));
|
|
|
}
|
|
|
@@ -864,13 +915,29 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
private final List<ShardRouting> unassigned;
|
|
|
private final List<ShardRouting> ignored;
|
|
|
|
|
|
- private int primaries = 0;
|
|
|
- private int ignoredPrimaries = 0;
|
|
|
+ private int primaries;
|
|
|
+ private int ignoredPrimaries;
|
|
|
|
|
|
public UnassignedShards(RoutingNodes nodes) {
|
|
|
+ this(nodes, new ArrayList<>(), new ArrayList<>(), 0, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ private UnassignedShards(
|
|
|
+ RoutingNodes nodes,
|
|
|
+ List<ShardRouting> unassigned,
|
|
|
+ List<ShardRouting> ignored,
|
|
|
+ int primaries,
|
|
|
+ int ignoredPrimaries
|
|
|
+ ) {
|
|
|
this.nodes = nodes;
|
|
|
- unassigned = new ArrayList<>();
|
|
|
- ignored = new ArrayList<>();
|
|
|
+ this.unassigned = unassigned;
|
|
|
+ this.ignored = ignored;
|
|
|
+ this.primaries = primaries;
|
|
|
+ this.ignoredPrimaries = ignoredPrimaries;
|
|
|
+ }
|
|
|
+
|
|
|
+ public UnassignedShards copyFor(RoutingNodes newNodes) {
|
|
|
+ return new UnassignedShards(newNodes, new ArrayList<>(unassigned), new ArrayList<>(ignored), primaries, ignoredPrimaries);
|
|
|
}
|
|
|
|
|
|
public void add(ShardRouting shardRouting) {
|
|
|
@@ -1277,6 +1344,13 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|
|
private int incoming = 0;
|
|
|
private int outgoing = 0;
|
|
|
|
|
|
+ public Recoveries copy() {
|
|
|
+ final Recoveries copy = new Recoveries();
|
|
|
+ copy.incoming = incoming;
|
|
|
+ copy.outgoing = outgoing;
|
|
|
+ return copy;
|
|
|
+ }
|
|
|
+
|
|
|
void addOutgoing(int howMany) {
|
|
|
assert outgoing + howMany >= 0 : outgoing + howMany + " must be >= 0";
|
|
|
outgoing += howMany;
|