|
@@ -8,7 +8,6 @@
|
|
|
|
|
|
package org.elasticsearch.cluster.routing.allocation.decider;
|
|
|
|
|
|
-import com.carrotsearch.hppc.ObjectIntHashMap;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.routing.RoutingNode;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
@@ -22,8 +21,9 @@ import org.elasticsearch.common.settings.Settings;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.function.Function;
|
|
|
-import java.util.stream.StreamSupport;
|
|
|
+import java.util.stream.Stream;
|
|
|
|
|
|
import static java.util.Collections.emptyList;
|
|
|
import static java.util.stream.Collectors.toList;
|
|
@@ -133,70 +133,67 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
|
|
}
|
|
|
|
|
|
final boolean debug = allocation.debugDecision();
|
|
|
- IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
|
|
|
+ final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
|
|
|
|
|
|
if (INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetadata.getSettings()).expandToAllNodes()) {
|
|
|
return YES_AUTO_EXPAND_ALL;
|
|
|
}
|
|
|
|
|
|
- int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary
|
|
|
+ final int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary
|
|
|
for (String awarenessAttribute : awarenessAttributes) {
|
|
|
// the node the shard exists on must be associated with an awareness attribute
|
|
|
if (node.node().getAttributes().containsKey(awarenessAttribute) == false) {
|
|
|
return debug ? debugNoMissingAttribute(awarenessAttribute, awarenessAttributes) : Decision.NO;
|
|
|
}
|
|
|
|
|
|
- // build attr_value -> nodes map
|
|
|
- ObjectIntHashMap<String> nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute);
|
|
|
+ final Set<String> actualAttributeValues = allocation.routingNodes().getAttributeValues(awarenessAttribute);
|
|
|
+ final String targetAttributeValue = node.node().getAttributes().get(awarenessAttribute);
|
|
|
+ assert targetAttributeValue != null : "attribute [" + awarenessAttribute + "] missing on " + node.node();
|
|
|
+ assert actualAttributeValues.contains(targetAttributeValue)
|
|
|
+ : "attribute [" + awarenessAttribute + "] on " + node.node() + " is not in " + actualAttributeValues;
|
|
|
+
|
|
|
+ int shardsForTargetAttributeValue = 0;
|
|
|
+ // Will be the count of shards on nodes with attribute `awarenessAttribute` matching the one on `node`.
|
|
|
|
|
|
- // build the count of shards per attribute value
|
|
|
- ObjectIntHashMap<String> shardPerAttribute = new ObjectIntHashMap<>();
|
|
|
for (ShardRouting assignedShard : allocation.routingNodes().assignedShards(shardRouting.shardId())) {
|
|
|
if (assignedShard.started() || assignedShard.initializing()) {
|
|
|
// Note: this also counts relocation targets as that will be the new location of the shard.
|
|
|
// Relocation sources should not be counted as the shard is moving away
|
|
|
- RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId());
|
|
|
- shardPerAttribute.addTo(routingNode.node().getAttributes().get(awarenessAttribute), 1);
|
|
|
+ final RoutingNode assignedNode = allocation.routingNodes().node(assignedShard.currentNodeId());
|
|
|
+ if (targetAttributeValue.equals(assignedNode.node().getAttributes().get(awarenessAttribute))) {
|
|
|
+ shardsForTargetAttributeValue += 1;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if (moveToNode) {
|
|
|
if (shardRouting.assignedToNode()) {
|
|
|
- String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId();
|
|
|
- if (node.nodeId().equals(nodeId) == false) {
|
|
|
- // we work on different nodes, move counts around
|
|
|
- shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().getAttributes().get(awarenessAttribute),
|
|
|
- 0, -1);
|
|
|
- shardPerAttribute.addTo(node.node().getAttributes().get(awarenessAttribute), 1);
|
|
|
- }
|
|
|
+ final RoutingNode currentNode = allocation.routingNodes().node(
|
|
|
+ shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId());
|
|
|
+ if (targetAttributeValue.equals(currentNode.node().getAttributes().get(awarenessAttribute)) == false) {
|
|
|
+ shardsForTargetAttributeValue += 1;
|
|
|
+ } // else this shard is already on a node in the same zone as the target node, so moving it doesn't change the count
|
|
|
} else {
|
|
|
- shardPerAttribute.addTo(node.node().getAttributes().get(awarenessAttribute), 1);
|
|
|
+ shardsForTargetAttributeValue += 1;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- int numberOfAttributes = nodesPerAttribute.size();
|
|
|
- List<String> fullValues = forcedAwarenessAttributes.get(awarenessAttribute);
|
|
|
- if (fullValues != null) {
|
|
|
- for (String fullValue : fullValues) {
|
|
|
- if (shardPerAttribute.containsKey(fullValue) == false) {
|
|
|
- numberOfAttributes++;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- // TODO should we remove ones that are not part of full list?
|
|
|
+ final List<String> forcedValues = forcedAwarenessAttributes.get(awarenessAttribute);
|
|
|
+ final int valueCount = forcedValues == null
|
|
|
+ ? actualAttributeValues.size()
|
|
|
+ : Math.toIntExact(Stream.concat(actualAttributeValues.stream(), forcedValues.stream()).distinct().count());
|
|
|
|
|
|
- final int currentNodeCount = shardPerAttribute.get(node.node().getAttributes().get(awarenessAttribute));
|
|
|
- final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes)
|
|
|
- if (currentNodeCount > maximumNodeCount) {
|
|
|
+ final int maximumShardsPerAttributeValue = (shardCount + valueCount - 1) / valueCount; // ceil(shardCount/valueCount)
|
|
|
+ if (shardsForTargetAttributeValue > maximumShardsPerAttributeValue) {
|
|
|
return debug ? debugNoTooManyCopies(
|
|
|
shardCount,
|
|
|
awarenessAttribute,
|
|
|
node.node().getAttributes().get(awarenessAttribute),
|
|
|
- numberOfAttributes,
|
|
|
- StreamSupport.stream(nodesPerAttribute.keys().spliterator(), false).map(c -> c.value).sorted().collect(toList()),
|
|
|
- fullValues == null ? null : fullValues.stream().sorted().collect(toList()),
|
|
|
- currentNodeCount,
|
|
|
- maximumNodeCount)
|
|
|
+ valueCount,
|
|
|
+ actualAttributeValues.stream().sorted().collect(toList()),
|
|
|
+ forcedValues == null ? null : forcedValues.stream().sorted().collect(toList()),
|
|
|
+ shardsForTargetAttributeValue,
|
|
|
+ maximumShardsPerAttributeValue)
|
|
|
: Decision.NO;
|
|
|
}
|
|
|
}
|
|
@@ -211,8 +208,8 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
|
|
int numberOfAttributes,
|
|
|
List<String> realAttributes,
|
|
|
List<String> forcedAttributes,
|
|
|
- int currentNodeCount,
|
|
|
- int maximumNodeCount) {
|
|
|
+ int actualShardCount,
|
|
|
+ int maximumShardCount) {
|
|
|
return Decision.single(Decision.Type.NO, NAME,
|
|
|
"there are [%d] copies of this shard and [%d] values for attribute [%s] (%s from nodes in the cluster and %s) so there " +
|
|
|
"may be at most [%d] copies of this shard allocated to nodes with each value, but (including this copy) there " +
|
|
@@ -222,8 +219,8 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
|
|
attributeName,
|
|
|
realAttributes,
|
|
|
forcedAttributes == null ? "no forced awareness" : forcedAttributes + " from forced awareness",
|
|
|
- maximumNodeCount,
|
|
|
- currentNodeCount,
|
|
|
+ maximumShardCount,
|
|
|
+ actualShardCount,
|
|
|
attributeName,
|
|
|
attributeValue);
|
|
|
}
|