Browse Source

Merge pull request #16926 from ywelsch/fix/balancer-move

Speed up shard balancer by reusing shard model while moving shards that can no longer be allocated to a node
Yannick Welsch 9 years ago
parent
commit
675d940f01

+ 1 - 41
core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

@@ -310,7 +310,7 @@ public class AllocationService extends AbstractComponent {
         }
 
         // move shards that no longer can be allocated
-        changed |= moveShards(allocation);
+        changed |= shardsAllocators.moveShards(allocation);
 
         // rebalance
         changed |= shardsAllocators.rebalance(allocation);
@@ -327,46 +327,6 @@ public class AllocationService extends AbstractComponent {
         }
     }
 
-    private boolean moveShards(RoutingAllocation allocation) {
-        boolean changed = false;
-
-        // create a copy of the shards interleaving between nodes, and check if they can remain
-        List<ShardRouting> shards = new ArrayList<>();
-        int index = 0;
-        boolean found = true;
-        final RoutingNodes routingNodes = allocation.routingNodes();
-        while (found) {
-            found = false;
-            for (RoutingNode routingNode : routingNodes) {
-                if (index >= routingNode.size()) {
-                    continue;
-                }
-                found = true;
-                shards.add(routingNode.get(index));
-            }
-            index++;
-        }
-        for (int i = 0; i < shards.size(); i++) {
-            ShardRouting shardRouting = shards.get(i);
-            // we can only move started shards...
-            if (!shardRouting.started()) {
-                continue;
-            }
-            final RoutingNode routingNode = routingNodes.node(shardRouting.currentNodeId());
-            Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
-            if (decision.type() == Decision.Type.NO) {
-                logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
-                boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation);
-                if (!moved) {
-                    logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
-                } else {
-                    changed = true;
-                }
-            }
-        }
-        return changed;
-    }
-
     private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
         boolean changed = false;
         final RoutingNodes routingNodes = allocation.routingNodes();

+ 79 - 40
core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

@@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.gateway.PriorityComparator;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -49,6 +50,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Predicate;
@@ -119,9 +121,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
     }
 
     @Override
-    public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
+    public boolean moveShards(RoutingAllocation allocation) {
         final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
-        return balancer.move(shardRouting, node);
+        return balancer.moveShards();
     }
 
     /**
@@ -489,56 +491,93 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
         }
 
         /**
-         * This function executes a move operation moving the given shard from
-         * the given node to the minimal eligible node with respect to the
-         * weight function. Iff the shard is moved the shard will be set to
+         * Move started shards that can not be allocated to a node anymore
+         *
+         * For each shard to be moved this function executes a move operation
+         * to the minimal eligible node with respect to the
+         * weight function. If a shard is moved the shard will be set to
          * {@link ShardRoutingState#RELOCATING} and a shadow instance of this
          * shard is created with an incremented version in the state
          * {@link ShardRoutingState#INITIALIZING}.
          *
-         * @return <code>true</code> iff the shard has successfully been moved.
+         * @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
          */
-        public boolean move(ShardRouting shard, RoutingNode node ) {
-            if (nodes.isEmpty() || !shard.started()) {
-                /* with no nodes or a not started shard this is pointless */
+        public boolean moveShards() {
+            if (nodes.isEmpty()) {
+                /* with no nodes this is pointless */
                 return false;
             }
-            if (logger.isTraceEnabled()) {
-                logger.trace("Try moving shard [{}] from [{}]", shard, node);
+
+            // Create a copy of the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
+            // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
+            // offloading the shards.
+            List<ShardRouting> shards = new ArrayList<>();
+            int index = 0;
+            boolean found = true;
+            while (found) {
+                found = false;
+                for (RoutingNode routingNode : routingNodes) {
+                    if (index >= routingNode.size()) {
+                        continue;
+                    }
+                    found = true;
+                    ShardRouting shardRouting = routingNode.get(index);
+                    // we can only move started shards...
+                    if (shardRouting.started()) {
+                        shards.add(shardRouting);
+                    }
+                }
+                index++;
             }
+            if (shards.isEmpty()) {
+                return false;
+            }
+
             final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
             boolean changed = initialize(routingNodes, unassigned);
-            if (!changed) {
-                final ModelNode sourceNode = nodes.get(node.nodeId());
-                assert sourceNode != null;
+            if (changed == false) {
                 final NodeSorter sorter = newNodeSorter();
-                sorter.reset(shard.getIndexName());
-                final ModelNode[] nodes = sorter.modelNodes;
-                assert sourceNode.containsShard(shard);
-                /*
-                 * the sorter holds the minimum weight node first for the shards index.
-                 * We now walk through the nodes until we find a node to allocate the shard.
-                 * This is not guaranteed to be balanced after this operation we still try best effort to
-                 * allocate on the minimal eligible node.
-                 */
-
-                for (ModelNode currentNode : nodes) {
-                    if (currentNode.getNodeId().equals(node.nodeId())) {
-                        continue;
-                    }
-                    RoutingNode target = currentNode.getRoutingNode(routingNodes);
-                    Decision allocationDecision = allocation.deciders().canAllocate(shard, target, allocation);
-                    Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation);
-                    Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
-                    if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
-                        sourceNode.removeShard(shard);
-                        ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
-                        currentNode.addShard(targetRelocatingShard, decision);
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
+                final ModelNode[] modelNodes = sorter.modelNodes;
+                for (ShardRouting shardRouting : shards) {
+                    final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
+                    assert sourceNode != null && sourceNode.containsShard(shardRouting);
+                    final RoutingNode routingNode = sourceNode.getRoutingNode(routingNodes);
+                    Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
+                    if (decision.type() == Decision.Type.NO) {
+                        logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
+                        sorter.reset(shardRouting.getIndexName());
+                        /*
+                         * the sorter holds the minimum weight node first for the shards index.
+                         * We now walk through the nodes until we find a node to allocate the shard.
+                         * This is not guaranteed to be balanced after this operation we still try best effort to
+                         * allocate on the minimal eligible node.
+                         */
+                        boolean moved = false;
+                        for (ModelNode currentNode : modelNodes) {
+                            if (currentNode == sourceNode) {
+                                continue;
+                            }
+                            RoutingNode target = currentNode.getRoutingNode(routingNodes);
+                            Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
+                            Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation);
+                            if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
+                                Decision sourceDecision = sourceNode.removeShard(shardRouting);
+                                ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
+                                // re-add (now relocating shard) to source node
+                                sourceNode.addShard(shardRouting, sourceDecision);
+                                Decision targetDecision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
+                                currentNode.addShard(targetRelocatingShard, targetDecision);
+                                if (logger.isTraceEnabled()) {
+                                    logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
+                                }
+                                moved = true;
+                                changed = true;
+                                break;
+                            }
+                        }
+                        if (moved == false) {
+                            logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
                         }
-                        changed = true;
-                        break;
                     }
                 }
             }

+ 9 - 12
core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java

@@ -19,7 +19,6 @@
 
 package org.elasticsearch.cluster.routing.allocation.allocator;
 
-import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
@@ -36,22 +35,22 @@ import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
 public interface ShardsAllocator {
 
     /**
-     * Applies changes on started nodes based on the implemented algorithm. For example if a 
-     * shard has changed to {@link ShardRoutingState#STARTED} from {@link ShardRoutingState#RELOCATING} 
+     * Applies changes on started nodes based on the implemented algorithm. For example if a
+     * shard has changed to {@link ShardRoutingState#STARTED} from {@link ShardRoutingState#RELOCATING}
      * this allocator might apply some cleanups on the node that used to hold the shard.
      * @param allocation all started {@link ShardRouting shards}
      */
     void applyStartedShards(StartedRerouteAllocation allocation);
 
     /**
-     * Applies changes on failed nodes based on the implemented algorithm. 
+     * Applies changes on failed nodes based on the implemented algorithm.
      * @param allocation all failed {@link ShardRouting shards}
      */
     void applyFailedShards(FailedRerouteAllocation allocation);
 
     /**
-     * Assign all unassigned shards to nodes 
-     * 
+     * Assign all unassigned shards to nodes
+     *
      * @param allocation current node allocation
      * @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
      */
@@ -59,19 +58,17 @@ public interface ShardsAllocator {
 
     /**
      * Rebalancing number of shards on all nodes
-     *   
+     *
      * @param allocation current node allocation
      * @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
      */
     boolean rebalance(RoutingAllocation allocation);
 
     /**
-     * Moves a shard from the given node to other node.
-     * 
-     * @param shardRouting the shard to move
-     * @param node A node containing the shard
+     * Move started shards that can not be allocated to a node anymore
+     *
      * @param allocation current node allocation
      * @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
      */
-    boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation);
+    boolean moveShards(RoutingAllocation allocation);
 }

+ 2 - 4
core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java

@@ -19,8 +19,6 @@
 
 package org.elasticsearch.cluster.routing.allocation.allocator;
 
-import org.elasticsearch.cluster.routing.RoutingNode;
-import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
 import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
@@ -96,7 +94,7 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
     }
 
     @Override
-    public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
-        return allocator.move(shardRouting, node, allocation);
+    public boolean moveShards(RoutingAllocation allocation) {
+        return allocator.moveShards(allocation);
     }
 }

+ 1 - 1
core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java

@@ -60,7 +60,7 @@ public class ClusterModuleTests extends ModuleTestCase {
             return false;
         }
         @Override
-        public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
+        public boolean moveShards(RoutingAllocation allocation) {
             return false;
         }
     }

+ 1 - 1
core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java

@@ -320,7 +320,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
             }
 
             @Override
-            public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
+            public boolean moveShards(RoutingAllocation allocation) {
                 return false;
             }