|
@@ -334,14 +334,13 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
return new NodeSorter(nodesArray(), weight, this);
|
|
|
}
|
|
|
|
|
|
- private boolean initialize(RoutingNodes routing) {
|
|
|
+ private boolean initialize(RoutingNodes routing, List<MutableShardRouting> unassigned) {
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("Start distributing Shards");
|
|
|
}
|
|
|
-
|
|
|
indices.addAll(allocation.routingTable().indicesRouting().keySet());
|
|
|
buildModelFromAssigned(routing.shards(assignedFilter));
|
|
|
- return allocateUnassigned(allocation.routingNodes().unassigned(), allocation.routingNodes().ignoredUnassigned());
|
|
|
+ return allocateUnassigned(unassigned, routing.ignoredUnassigned());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -366,8 +365,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("Start balancing cluster");
|
|
|
}
|
|
|
-
|
|
|
- boolean changed = initialize(allocation.routingNodes());
|
|
|
+ final TransactionalList<MutableShardRouting> unassigned = new TransactionalList<MutableShardRouting>(allocation.routingNodes().unassigned());
|
|
|
+ boolean changed = initialize(allocation.routingNodes(), unassigned);
|
|
|
NodeSorter sorter = newNodeSorter();
|
|
|
if (nodes.size() > 1) { /* skip if we only have one node */
|
|
|
for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) {
|
|
@@ -445,6 +444,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ unassigned.commit();
|
|
|
return changed;
|
|
|
}
|
|
|
|
|
@@ -519,7 +519,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("Try moving shard [{}] from [{}]", shard, node);
|
|
|
}
|
|
|
- boolean changed = initialize(allocation.routingNodes());
|
|
|
+ final TransactionalList<MutableShardRouting> unassigned = new TransactionalList<MutableShardRouting>(allocation.routingNodes().unassigned());
|
|
|
+ boolean changed = initialize(allocation.routingNodes(), unassigned);
|
|
|
|
|
|
final ModelNode sourceNode = nodes.get(node.nodeId());
|
|
|
assert sourceNode != null;
|
|
@@ -533,6 +534,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
* This is not guaranteed to be balanced after this operation we still try best effort to
|
|
|
* allocate on the minimal eligible node.
|
|
|
*/
|
|
|
+
|
|
|
for (ModelNode currentNode : nodes) {
|
|
|
if (currentNode.getNodeId().equals(node.nodeId())) {
|
|
|
continue;
|
|
@@ -549,10 +551,11 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
|
|
|
}
|
|
|
- return true;
|
|
|
+ changed = true;
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+ unassigned.commit();
|
|
|
return changed;
|
|
|
}
|
|
|
|
|
@@ -1039,4 +1042,37 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
return weights[weights.length - 1] - weights[0];
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A list that makes a full copy of the original list and applies all
|
|
|
+ * modification to the copied list once {@link TransactionalList#commit()}
|
|
|
+ * is called.
|
|
|
+ *
|
|
|
+ */
|
|
|
+ @SuppressWarnings("serial")
|
|
|
+ private static final class TransactionalList<T> extends ArrayList<T> {
|
|
|
+
|
|
|
+ private final List<T> originalList;
|
|
|
+ private List<T> assertingList; // only with assert
|
|
|
+
|
|
|
+ TransactionalList(List<T> originalList) {
|
|
|
+ super(originalList);
|
|
|
+ assert copyAsseringList(originalList);
|
|
|
+ this.originalList = originalList;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean copyAsseringList(List<T> orig) {
|
|
|
+ this.assertingList = new ArrayList<T>(orig);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void commit() {
|
|
|
+ /* Ensure that the actual source list is not modified while
|
|
|
+ * the transaction is running */
|
|
|
+ assert assertingList.equals(originalList) : "The list was modified outside of the scope";
|
|
|
+ originalList.clear();
|
|
|
+ originalList.addAll(this);
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|