Browse Source

Fix RebalanceOnlyWhenActiveAllocationDecider (#96025)

Currently above decider has 2 issues:
* it does not allow rebalance is relocation target is initializing
* it does allow rebalancing when one of the shards is relocating and another is
unassigned

This commit fixes above 2 issues
Ievgen Degtiarenko 2 years ago
parent
commit
a02ffaf351

+ 5 - 0
docs/changelog/96025.yaml

@@ -0,0 +1,5 @@
+pr: 96025
+summary: Fix `RebalanceOnlyWhenActiveAllocationDecider`
+area: Allocation
+type: bug
+issues: []

+ 8 - 5
server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

@@ -375,17 +375,20 @@ public class RoutingNodes implements Iterable<RoutingNode> {
     /**
      * Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code>
      */
-    public boolean allReplicasActive(ShardId shardId, Metadata metadata) {
+    public boolean allShardsActive(ShardId shardId, Metadata metadata) {
         final List<ShardRouting> shards = assignedShards(shardId);
-        if (shards.isEmpty() || shards.size() < metadata.getIndexSafe(shardId.getIndex()).getNumberOfReplicas() + 1) {
+        final int shardCopies = metadata.getIndexSafe(shardId.getIndex()).getNumberOfReplicas() + 1;
+        if (shards.size() < shardCopies) {
             return false; // if we are empty nothing is active if we have less than total at least one is unassigned
         }
+        int active = 0;
         for (ShardRouting shard : shards) {
-            if (shard.active() == false) {
-                return false;
+            if (shard.active()) {
+                active++;
             }
         }
-        return true;
+        assert active <= shardCopies;
+        return active == shardCopies;
     }
 
     @Override

+ 14 - 4
server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java

@@ -18,11 +18,21 @@ public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider
 
     public static final String NAME = "rebalance_only_when_active";
 
+    static final Decision YES_ALL_REPLICAS_ACTIVE = Decision.single(
+        Decision.Type.YES,
+        NAME,
+        "rebalancing is allowed as all copies of this shard are active"
+    );
+    static final Decision NO_SOME_REPLICAS_INACTIVE = Decision.single(
+        Decision.Type.NO,
+        NAME,
+        "rebalancing is not allowed until all copies of this shard are active"
+    );
+
     @Override
     public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
-        if (allocation.routingNodes().allReplicasActive(shardRouting.shardId(), allocation.metadata()) == false) {
-            return allocation.decision(Decision.NO, NAME, "rebalancing is not allowed until all replicas in the cluster are active");
-        }
-        return allocation.decision(Decision.YES, NAME, "rebalancing is allowed as all replicas are active in the cluster");
+        return allocation.routingNodes().allShardsActive(shardRouting.shardId(), allocation.metadata())
+            ? YES_ALL_REPLICAS_ACTIVE
+            : NO_SOME_REPLICAS_INACTIVE;
     }
 }

+ 9 - 10
server/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java

@@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
-import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.gateway.TestGatewayAllocator;
 
@@ -52,8 +51,11 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
         );
 
         Metadata metadata = Metadata.builder()
-            .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
-            .put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
+            .put(IndexMetadata.builder("test1").settings(indexSettings(Version.CURRENT, 1, 1)))
+            .put(
+                IndexMetadata.builder("test2")
+                    .settings(indexSettings(Version.CURRENT, 1, 1).put("index.routing.allocation.include._id", "node1,node2"))
+            )
             .build();
 
         RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY)
@@ -86,7 +88,7 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
 
         for (int i = 0; i < clusterState.routingTable().index("test1").size(); i++) {
             assertThat(clusterState.routingTable().index("test1").shard(i).size(), equalTo(2));
-            // assertThat(clusterState.routingTable().index("test1").shard(i).primaryShard().state(), equalTo(STARTED));
+            assertThat(clusterState.routingTable().index("test1").shard(i).primaryShard().state(), equalTo(STARTED));
             assertThat(clusterState.routingTable().index("test1").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
         }
 
@@ -111,14 +113,11 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
             assertThat(clusterState.routingTable().index("test2").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
         }
 
-        logger.info("now, start 2 more nodes, check that rebalancing will happen (for test1) because we set it to always");
-        clusterState = ClusterState.builder(clusterState)
-            .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3")).add(newNode("node4")))
-            .build();
+        logger.info("now, start 1 more nodes, check that rebalancing will happen (for test1) because we set it to always");
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build();
         clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop());
-        RoutingNodes routingNodes = clusterState.getRoutingNodes();
 
-        final var newNodesIterator = Iterators.concat(routingNodes.node("node3").iterator(), routingNodes.node("node4").iterator());
+        final var newNodesIterator = clusterState.getRoutingNodes().node("node3").iterator();
         assertThat(newNodesIterator.next().shardId().getIndex().getName(), equalTo("test1"));
         assertFalse(newNodesIterator.hasNext());
     }

+ 5 - 8
server/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java

@@ -33,8 +33,8 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
 import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
 import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.Matchers.oneOf;
 
 public class RebalanceAfterActiveTests extends ESAllocationTestCase {
     private final Logger logger = LogManager.getLogger(RebalanceAfterActiveTests.class);
@@ -63,7 +63,7 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
         );
         logger.info("Building initial routing table");
 
-        var indexMetadata = IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1).build();
+        var indexMetadata = IndexMetadata.builder("test").settings(indexSettings(Version.CURRENT, 5, 1)).build();
 
         ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
             .metadata(Metadata.builder().put(indexMetadata, false))
@@ -128,11 +128,8 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
         logger.info("start the replica shards, rebalancing should start");
         clusterState = startInitializingShardsAndReroute(strategy, clusterState);
 
-        // both primary and replica should not be rebalanced at once so 5 replicas should start moving
-        // unless we computed the balance where one of the indices already have both primary and replica on desired nodes
-        // in such case only 4 shards are immediately relocating
-        assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), oneOf(5, 6));
-        assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), oneOf(4, 5));
+        assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED), hasSize(2));
+        assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING), hasSize(8));
 
         logger.info("complete all relocations");
         clusterState = applyStartedShardsUntilNoChange(clusterState, strategy);
@@ -141,7 +138,7 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
         clusterState = startInitializingShardsAndReroute(strategy, clusterState);
         RoutingNodes routingNodes = clusterState.getRoutingNodes();
 
-        assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(10));
+        assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED), hasSize(10));
         // make sure we have an even relocation
         for (RoutingNode routingNode : routingNodes) {
             assertThat(routingNode.size(), equalTo(1));

+ 118 - 0
server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDeciderTests.java

@@ -0,0 +1,118 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.routing.allocation.decider;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ESAllocationTestCase;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
+
+import java.util.List;
+
+import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
+import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
+import static org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider.NO_SOME_REPLICAS_INACTIVE;
+import static org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider.YES_ALL_REPLICAS_ACTIVE;
+import static org.hamcrest.Matchers.equalTo;
+
+public class RebalanceOnlyWhenActiveAllocationDeciderTests extends ESAllocationTestCase {
+
+    public void testAllowRebalanceWhenAllShardsActive() {
+
+        var index = new Index("test", "_na_");
+        var primary = newShardRouting(new ShardId(index, 0), "node-1", true, STARTED);
+        var replica = newShardRouting(new ShardId(index, 0), "node-2", false, STARTED);
+
+        var state = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(Metadata.builder().put(IndexMetadata.builder(index.getName()).settings(indexSettings(Version.CURRENT, 1, 1))))
+            .nodes(DiscoveryNodes.builder().add(newNode("node-1")).add(newNode("node-2")).add(newNode("node-3")))
+            .routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addShard(primary).addShard(replica)))
+            .build();
+
+        var allocation = createRoutingAllocation(state);
+        var decider = new RebalanceOnlyWhenActiveAllocationDecider();
+
+        assertThat(decider.canRebalance(primary, allocation), equalTo(YES_ALL_REPLICAS_ACTIVE));
+        assertThat(decider.canRebalance(replica, allocation), equalTo(YES_ALL_REPLICAS_ACTIVE));
+    }
+
+    public void testDoNotAllowRebalanceWhenSomeShardsAreNotActive() {
+
+        var index = new Index("test", "_na_");
+        var primary = newShardRouting(new ShardId(index, 0), "node-1", true, STARTED);
+        var replica = randomBoolean()
+            ? newShardRouting(new ShardId(index, 0), null, false, UNASSIGNED)
+            : newShardRouting(new ShardId(index, 0), "node-2", false, INITIALIZING);
+
+        var state = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(Metadata.builder().put(IndexMetadata.builder(index.getName()).settings(indexSettings(Version.CURRENT, 1, 1))))
+            .nodes(DiscoveryNodes.builder().add(newNode("node-1")).add(newNode("node-2")).add(newNode("node-3")))
+            .routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addShard(primary).addShard(replica)))
+            .build();
+
+        var allocation = createRoutingAllocation(state);
+        var decider = new RebalanceOnlyWhenActiveAllocationDecider();
+
+        assertThat(decider.canRebalance(primary, allocation), equalTo(NO_SOME_REPLICAS_INACTIVE));
+    }
+
+    public void testDoNotAllowRebalanceWhenSomeShardsAreNotActiveAndRebalancing() {
+
+        var index = new Index("test", "_na_");
+        var primary = newShardRouting(new ShardId(index, 0), "node-1", true, STARTED);
+        var replica1 = newShardRouting(new ShardId(index, 0), "node-2", "node-3", false, RELOCATING);
+        var replica2 = newShardRouting(new ShardId(index, 0), null, false, UNASSIGNED);
+
+        var state = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(Metadata.builder().put(IndexMetadata.builder(index.getName()).settings(indexSettings(Version.CURRENT, 1, 2))))
+            .nodes(DiscoveryNodes.builder().add(newNode("node-1")).add(newNode("node-2")).add(newNode("node-3")))
+            .routingTable(
+                RoutingTable.builder().add(IndexRoutingTable.builder(index).addShard(primary).addShard(replica1).addShard(replica2))
+            )
+            .build();
+
+        var allocation = createRoutingAllocation(state);
+        var decider = new RebalanceOnlyWhenActiveAllocationDecider();
+
+        assertThat(decider.canRebalance(primary, allocation), equalTo(NO_SOME_REPLICAS_INACTIVE));
+    }
+
+    public void testAllowConcurrentRebalance() {
+
+        var index = new Index("test", "_na_");
+        var primary = newShardRouting(new ShardId(index, 0), "node-1", true, STARTED);
+        var replica = newShardRouting(new ShardId(index, 0), "node-2", "node-3", false, RELOCATING);
+
+        var state = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(Metadata.builder().put(IndexMetadata.builder(index.getName()).settings(indexSettings(Version.CURRENT, 1, 1))))
+            .nodes(DiscoveryNodes.builder().add(newNode("node-1")).add(newNode("node-2")).add(newNode("node-3")))
+            .routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addShard(primary).addShard(replica)))
+            .build();
+
+        var allocation = createRoutingAllocation(state);
+        var decider = new RebalanceOnlyWhenActiveAllocationDecider();
+
+        assertThat(decider.canRebalance(primary, allocation), equalTo(YES_ALL_REPLICAS_ACTIVE));
+    }
+
+    private static RoutingAllocation createRoutingAllocation(ClusterState state) {
+        return new RoutingAllocation(new AllocationDeciders(List.of()), state, null, null, 0L);
+    }
+}