|
@@ -29,13 +29,12 @@ import org.elasticsearch.cluster.routing.RoutingNode;
|
|
|
import org.elasticsearch.cluster.routing.RoutingNodes;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|
|
-import org.elasticsearch.cluster.routing.UnassignedInfo;
|
|
|
+import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
|
|
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
|
|
|
+import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
|
|
|
import org.elasticsearch.cluster.routing.allocation.MoveDecision;
|
|
|
-import org.elasticsearch.cluster.routing.allocation.NodeRebalanceResult;
|
|
|
-import org.elasticsearch.cluster.routing.allocation.RebalanceDecision;
|
|
|
-import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
|
|
import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult;
|
|
|
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
|
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
|
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
|
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
|
@@ -49,12 +48,14 @@ import org.elasticsearch.common.settings.Setting.Property;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.gateway.PriorityComparator;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.IdentityHashMap;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
|
|
@@ -134,7 +135,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
* from the cluster allocation explain API to explain possible rebalancing decisions for a single
|
|
|
* shard.
|
|
|
*/
|
|
|
- public RebalanceDecision decideRebalance(final ShardRouting shard, final RoutingAllocation allocation) {
|
|
|
+ public MoveDecision decideRebalance(final ShardRouting shard, final RoutingAllocation allocation) {
|
|
|
assert allocation.debugDecision() : "debugDecision should be set in explain mode";
|
|
|
return new Balancer(logger, allocation, weightFunction, threshold).decideRebalance(shard);
|
|
|
}
|
|
@@ -334,24 +335,14 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
* optimally balanced cluster. This method is invoked from the cluster allocation
|
|
|
* explain API only.
|
|
|
*/
|
|
|
- private RebalanceDecision decideRebalance(final ShardRouting shard) {
|
|
|
+ private MoveDecision decideRebalance(final ShardRouting shard) {
|
|
|
if (shard.started() == false) {
|
|
|
// cannot rebalance a shard that isn't started
|
|
|
- return RebalanceDecision.NOT_TAKEN;
|
|
|
+ return MoveDecision.NOT_TAKEN;
|
|
|
}
|
|
|
|
|
|
Decision canRebalance = allocation.deciders().canRebalance(shard, allocation);
|
|
|
|
|
|
- if (allocation.hasPendingAsyncFetch()) {
|
|
|
- return new RebalanceDecision(
|
|
|
- canRebalance,
|
|
|
- Type.NO,
|
|
|
- "cannot rebalance due to in-flight shard store fetches, otherwise allocation may prematurely rebalance a shard to " +
|
|
|
- "a node that is soon to receive another shard assignment upon completion of the shard store fetch, " +
|
|
|
- "rendering the cluster imbalanced again"
|
|
|
- );
|
|
|
- }
|
|
|
-
|
|
|
sorter.reset(shard.getIndexName());
|
|
|
ModelNode[] modelNodes = sorter.modelNodes;
|
|
|
final String currentNodeId = shard.currentNodeId();
|
|
@@ -369,12 +360,14 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
final float currentWeight = sorter.weight(currentNode);
|
|
|
final AllocationDeciders deciders = allocation.deciders();
|
|
|
final String idxName = shard.getIndexName();
|
|
|
- Map<String, NodeRebalanceResult> nodeDecisions = new HashMap<>(modelNodes.length - 1);
|
|
|
Type rebalanceDecisionType = Type.NO;
|
|
|
- String assignedNodeId = null;
|
|
|
+ ModelNode assignedNode = null;
|
|
|
+ List<Tuple<ModelNode, Decision>> betterBalanceNodes = new ArrayList<>();
|
|
|
+ List<Tuple<ModelNode, Decision>> sameBalanceNodes = new ArrayList<>();
|
|
|
+ List<Tuple<ModelNode, Decision>> worseBalanceNodes = new ArrayList<>();
|
|
|
for (ModelNode node : modelNodes) {
|
|
|
if (node == currentNode) {
|
|
|
- continue; // skip over node we're currently allocated to it
|
|
|
+ continue; // skip over node we're currently allocated to
|
|
|
}
|
|
|
final Decision canAllocate = deciders.canAllocate(shard, node.getRoutingNode(), allocation);
|
|
|
// the current weight of the node in the cluster, as computed by the weight function;
|
|
@@ -387,22 +380,21 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
// can make the balance of the cluster better, so we check for that here
|
|
|
final boolean betterWeightThanCurrent = nodeWeight <= currentWeight;
|
|
|
boolean rebalanceConditionsMet = false;
|
|
|
- boolean deltaAboveThreshold = false;
|
|
|
- float weightWithShardAdded = Float.POSITIVE_INFINITY;
|
|
|
if (betterWeightThanCurrent) {
|
|
|
// get the delta between the weights of the node we are checking and the node that holds the shard
|
|
|
- final float currentDelta = absDelta(nodeWeight, currentWeight);
|
|
|
+ float currentDelta = absDelta(nodeWeight, currentWeight);
|
|
|
// checks if the weight delta is above a certain threshold; if it is not above a certain threshold,
|
|
|
// then even though the node we are examining has a better weight and may make the cluster balance
|
|
|
// more even, it doesn't make sense to execute the heavyweight operation of relocating a shard unless
|
|
|
// the gains make it worth it, as defined by the threshold
|
|
|
- deltaAboveThreshold = lessThan(currentDelta, threshold) == false;
|
|
|
+ boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false;
|
|
|
// simulate the weight of the node if we were to relocate the shard to it
|
|
|
- weightWithShardAdded = weight.weightShardAdded(this, node, idxName);
|
|
|
+ float weightWithShardAdded = weight.weightShardAdded(this, node, idxName);
|
|
|
// calculate the delta of the weights of the two nodes if we were to add the shard to the
|
|
|
// node in question and move it away from the node that currently holds it.
|
|
|
- final float proposedDelta = weightWithShardAdded - weight.weightShardRemoved(this, currentNode, idxName);
|
|
|
- rebalanceConditionsMet = deltaAboveThreshold && proposedDelta < currentDelta;
|
|
|
+ float proposedDelta = weightWithShardAdded - weight.weightShardRemoved(this, currentNode, idxName);
|
|
|
+ boolean betterWeightWithShardAdded = proposedDelta < currentDelta;
|
|
|
+ rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded;
|
|
|
// if the simulated weight delta with the shard moved away is better than the weight delta
|
|
|
// with the shard remaining on the current node, and we are allowed to allocate to the
|
|
|
// node in question, then allow the rebalance
|
|
@@ -410,26 +402,47 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
// rebalance to the node, only will get overwritten if the decision here is to
|
|
|
// THROTTLE and we get a decision with YES on another node
|
|
|
rebalanceDecisionType = canAllocate.type();
|
|
|
- assignedNodeId = node.getNodeId();
|
|
|
+ assignedNode = node;
|
|
|
}
|
|
|
}
|
|
|
- nodeDecisions.put(node.getNodeId(), new NodeRebalanceResult(
|
|
|
- rebalanceConditionsMet ? canAllocate.type() : Type.NO,
|
|
|
- canAllocate,
|
|
|
- betterWeightThanCurrent,
|
|
|
- deltaAboveThreshold,
|
|
|
- nodeWeight,
|
|
|
- weightWithShardAdded)
|
|
|
- );
|
|
|
+ Tuple<ModelNode, Decision> nodeResult = Tuple.tuple(node, canAllocate);
|
|
|
+ if (rebalanceConditionsMet) {
|
|
|
+ betterBalanceNodes.add(nodeResult);
|
|
|
+ } else if (betterWeightThanCurrent) {
|
|
|
+ sameBalanceNodes.add(nodeResult);
|
|
|
+ } else {
|
|
|
+ worseBalanceNodes.add(nodeResult);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ int weightRanking = 0;
|
|
|
+ List<NodeAllocationResult> nodeDecisions = new ArrayList<>(modelNodes.length - 1);
|
|
|
+ for (Tuple<ModelNode, Decision> result : betterBalanceNodes) {
|
|
|
+ nodeDecisions.add(new NodeAllocationResult(
|
|
|
+ result.v1().routingNode.node(), AllocationDecision.fromDecisionType(result.v2().type()), result.v2(), ++weightRanking)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ int currentNodeWeightRanking = ++weightRanking;
|
|
|
+ for (Tuple<ModelNode, Decision> result : sameBalanceNodes) {
|
|
|
+ AllocationDecision nodeDecision = result.v2().type() == Type.NO ? AllocationDecision.NO : AllocationDecision.WORSE_BALANCE;
|
|
|
+ nodeDecisions.add(new NodeAllocationResult(
|
|
|
+ result.v1().routingNode.node(), nodeDecision, result.v2(), currentNodeWeightRanking)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ for (Tuple<ModelNode, Decision> result : worseBalanceNodes) {
|
|
|
+ AllocationDecision nodeDecision = result.v2().type() == Type.NO ? AllocationDecision.NO : AllocationDecision.WORSE_BALANCE;
|
|
|
+ nodeDecisions.add(new NodeAllocationResult(
|
|
|
+ result.v1().routingNode.node(), nodeDecision, result.v2(), ++weightRanking)
|
|
|
+ );
|
|
|
+ }
|
|
|
|
|
|
- if (canRebalance.type() != Type.YES) {
|
|
|
- return new RebalanceDecision(canRebalance, canRebalance.type(), "rebalancing is not allowed", null,
|
|
|
- nodeDecisions, currentWeight);
|
|
|
+ if (canRebalance.type() != Type.YES || allocation.hasPendingAsyncFetch()) {
|
|
|
+ AllocationDecision allocationDecision = allocation.hasPendingAsyncFetch() ? AllocationDecision.FETCH_PENDING :
|
|
|
+ AllocationDecision.fromDecisionType(canRebalance.type());
|
|
|
+ return MoveDecision.cannotRebalance(canRebalance, allocationDecision, currentNodeWeightRanking, nodeDecisions);
|
|
|
} else {
|
|
|
- return RebalanceDecision.decision(canRebalance, rebalanceDecisionType, assignedNodeId,
|
|
|
- nodeDecisions, currentWeight, threshold);
|
|
|
+ return MoveDecision.rebalance(canRebalance, AllocationDecision.fromDecisionType(rebalanceDecisionType),
|
|
|
+ assignedNode != null ? assignedNode.routingNode.node() : null, currentNodeWeightRanking, nodeDecisions);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -632,9 +645,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) {
|
|
|
ShardRouting shardRouting = it.next();
|
|
|
final MoveDecision moveDecision = makeMoveDecision(shardRouting);
|
|
|
- if (moveDecision.move()) {
|
|
|
+ if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) {
|
|
|
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
|
|
|
- final ModelNode targetNode = nodes.get(moveDecision.getAssignedNodeId());
|
|
|
+ final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
|
|
|
sourceNode.removeShard(shardRouting);
|
|
|
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(shardRouting, targetNode.getNodeId(),
|
|
|
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes());
|
|
@@ -642,7 +655,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
|
|
|
}
|
|
|
- } else if (moveDecision.cannotRemain()) {
|
|
|
+ } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
|
|
|
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
|
|
|
}
|
|
|
}
|
|
@@ -654,11 +667,11 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
* 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false.
|
|
|
* 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and
|
|
|
* {@link MoveDecision#canRemainDecision} will have a decision type of YES. All other fields in the object will be null.
|
|
|
- * 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#finalDecision} will be populated
|
|
|
- * with the decision of moving to another node. If {@link MoveDecision#finalDecision} returns YES, then
|
|
|
- * {@link MoveDecision#assignedNodeId} will return a non-null value, otherwise the assignedNodeId will be null.
|
|
|
+ * 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be
|
|
|
+ * populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then
|
|
|
+ * {@link MoveDecision#targetNode} will return a non-null value, otherwise the assignedNodeId will be null.
|
|
|
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then
|
|
|
- * {@link MoveDecision#finalExplanation} and {@link MoveDecision#nodeDecisions} will have non-null values.
|
|
|
+ * {@link MoveDecision#nodeDecisions} will have a non-null value.
|
|
|
*/
|
|
|
public MoveDecision makeMoveDecision(final ShardRouting shardRouting) {
|
|
|
if (shardRouting.started() == false) {
|
|
@@ -672,7 +685,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
RoutingNode routingNode = sourceNode.getRoutingNode();
|
|
|
Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
|
|
|
if (canRemain.type() != Decision.Type.NO) {
|
|
|
- return MoveDecision.stay(canRemain, explain);
|
|
|
+ return MoveDecision.stay(canRemain);
|
|
|
}
|
|
|
|
|
|
sorter.reset(shardRouting.getIndexName());
|
|
@@ -684,14 +697,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
*/
|
|
|
Type bestDecision = Type.NO;
|
|
|
RoutingNode targetNode = null;
|
|
|
- final Map<String, NodeAllocationResult> nodeExplanationMap = explain ? new HashMap<>() : null;
|
|
|
+ final List<NodeAllocationResult> nodeExplanationMap = explain ? new ArrayList<>() : null;
|
|
|
+ int weightRanking = 0;
|
|
|
for (ModelNode currentNode : sorter.modelNodes) {
|
|
|
if (currentNode != sourceNode) {
|
|
|
RoutingNode target = currentNode.getRoutingNode();
|
|
|
// don't use canRebalance as we want hard filtering rules to apply. See #17698
|
|
|
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
|
|
|
if (explain) {
|
|
|
- nodeExplanationMap.put(currentNode.getNodeId(), new NodeAllocationResult(allocationDecision, sorter.weight(currentNode)));
|
|
|
+ nodeExplanationMap.add(new NodeAllocationResult(
|
|
|
+ currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking));
|
|
|
}
|
|
|
// TODO maybe we can respect throttling here too?
|
|
|
if (allocationDecision.type().higherThan(bestDecision)) {
|
|
@@ -708,8 +723,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- return MoveDecision.decision(canRemain, bestDecision, explain, shardRouting.currentNodeId(),
|
|
|
- targetNode != null ? targetNode.nodeId() : null, nodeExplanationMap);
|
|
|
+ return MoveDecision.cannotRemain(canRemain, AllocationDecision.fromDecisionType(bestDecision),
|
|
|
+ targetNode != null ? targetNode.node() : null, nodeExplanationMap);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -793,11 +808,11 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
for (int i = 0; i < primaryLength; i++) {
|
|
|
ShardRouting shard = primary[i];
|
|
|
AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard, throttledNodes);
|
|
|
- final Type decisionType = allocationDecision.getFinalDecisionType();
|
|
|
- final String assignedNodeId = allocationDecision.getAssignedNodeId();
|
|
|
+ final String assignedNodeId = allocationDecision.getTargetNode() != null ?
|
|
|
+ allocationDecision.getTargetNode().getId() : null;
|
|
|
final ModelNode minNode = assignedNodeId != null ? nodes.get(assignedNodeId) : null;
|
|
|
|
|
|
- if (decisionType == Type.YES) {
|
|
|
+ if (allocationDecision.getAllocationDecision() == AllocationDecision.YES) {
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
|
|
|
}
|
|
@@ -816,12 +831,13 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
} else {
|
|
|
// did *not* receive a YES decision
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
- logger.trace("No eligible node found to assign shard [{}] decision [{}]", shard, decisionType);
|
|
|
+ logger.trace("No eligible node found to assign shard [{}] allocation_status [{}]", shard,
|
|
|
+ allocationDecision.getAllocationStatus());
|
|
|
}
|
|
|
|
|
|
if (minNode != null) {
|
|
|
// throttle decision scenario
|
|
|
- assert decisionType == Type.THROTTLE;
|
|
|
+ assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED;
|
|
|
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
|
|
|
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
|
|
|
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
|
|
@@ -829,23 +845,22 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type();
|
|
|
if (nodeLevelDecision != Type.YES) {
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
- logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decisionType);
|
|
|
+ logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node,
|
|
|
+ allocationDecision.getAllocationStatus());
|
|
|
}
|
|
|
assert nodeLevelDecision == Type.NO;
|
|
|
throttledNodes.add(minNode);
|
|
|
}
|
|
|
} else {
|
|
|
- assert decisionType == Type.NO;
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("No Node found to assign shard [{}]", shard);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- UnassignedInfo.AllocationStatus allocationStatus = UnassignedInfo.AllocationStatus.fromDecision(decisionType);
|
|
|
- unassigned.ignoreShard(shard, allocationStatus, allocation.changes());
|
|
|
+ unassigned.ignoreShard(shard, allocationDecision.getAllocationStatus(), allocation.changes());
|
|
|
if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas
|
|
|
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) {
|
|
|
- unassigned.ignoreShard(primary[++i], allocationStatus, allocation.changes());
|
|
|
+ unassigned.ignoreShard(primary[++i], allocationDecision.getAllocationStatus(), allocation.changes());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -871,25 +886,26 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
return AllocateUnassignedDecision.NOT_TAKEN;
|
|
|
}
|
|
|
|
|
|
+ final boolean explain = allocation.debugDecision();
|
|
|
Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation);
|
|
|
- if (shardLevelDecision.type() == Type.NO) {
|
|
|
+ if (shardLevelDecision.type() == Type.NO && explain == false) {
|
|
|
// NO decision for allocating the shard, irrespective of any particular node, so exit early
|
|
|
- return AllocateUnassignedDecision.no(shardLevelDecision, explain("cannot allocate shard in its current state"));
|
|
|
+ return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null);
|
|
|
}
|
|
|
|
|
|
/* find an node with minimal weight we can allocate on*/
|
|
|
float minWeight = Float.POSITIVE_INFINITY;
|
|
|
ModelNode minNode = null;
|
|
|
Decision decision = null;
|
|
|
- final boolean explain = allocation.debugDecision();
|
|
|
if (throttledNodes.size() >= nodes.size() && explain == false) {
|
|
|
// all nodes are throttled, so we know we won't be able to allocate this round,
|
|
|
// so if we are not in explain mode, short circuit
|
|
|
- return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.DECIDERS_NO, null);
|
|
|
+ return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null);
|
|
|
}
|
|
|
/* Don't iterate over an identity hashset here the
|
|
|
* iteration order is different for each run and makes testing hard */
|
|
|
Map<String, NodeAllocationResult> nodeExplanationMap = explain ? new HashMap<>() : null;
|
|
|
+ List<Tuple<String, Float>> nodeWeights = explain ? new ArrayList<>() : null;
|
|
|
for (ModelNode node : nodes.values()) {
|
|
|
if ((throttledNodes.contains(node) || node.containsShard(shard)) && explain == false) {
|
|
|
// decision is NO without needing to check anything further, so short circuit
|
|
@@ -905,7 +921,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
|
|
|
Decision currentDecision = allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation);
|
|
|
if (explain) {
|
|
|
- nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(currentDecision, currentWeight));
|
|
|
+ nodeExplanationMap.put(node.getNodeId(),
|
|
|
+ new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0));
|
|
|
+ nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight));
|
|
|
}
|
|
|
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
|
|
|
final boolean updateMinNode;
|
|
@@ -946,23 +964,24 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
// decision was not set and a node was not assigned, so treat it as a NO decision
|
|
|
decision = Decision.NO;
|
|
|
}
|
|
|
+ List<NodeAllocationResult> nodeDecisions = null;
|
|
|
+ if (explain) {
|
|
|
+ nodeDecisions = new ArrayList<>();
|
|
|
+ // fill in the correct weight ranking, once we've been through all nodes
|
|
|
+ nodeWeights.sort((nodeWeight1, nodeWeight2) -> Float.compare(nodeWeight1.v2(), nodeWeight2.v2()));
|
|
|
+ int weightRanking = 0;
|
|
|
+ for (Tuple<String, Float> nodeWeight : nodeWeights) {
|
|
|
+ NodeAllocationResult current = nodeExplanationMap.get(nodeWeight.v1());
|
|
|
+ nodeDecisions.add(new NodeAllocationResult(current.getNode(), current.getCanAllocateDecision(), ++weightRanking));
|
|
|
+ }
|
|
|
+ }
|
|
|
return AllocateUnassignedDecision.fromDecision(
|
|
|
decision,
|
|
|
- minNode != null ? minNode.getNodeId() : null,
|
|
|
- explain,
|
|
|
- nodeExplanationMap
|
|
|
+ minNode != null ? minNode.routingNode.node() : null,
|
|
|
+ nodeDecisions
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- // provide an explanation, if in explain mode
|
|
|
- private String explain(String explanation) {
|
|
|
- if (allocation.debugDecision()) {
|
|
|
- return explanation;
|
|
|
- } else {
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the
|
|
|
* balance model. Iff this method returns a <code>true</code> the relocation has already been executed on the
|
|
@@ -1223,5 +1242,4 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
return weights[weights.length - 1] - weights[0];
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|