1
0
Эх сурвалжийг харах

Weaken node-replacement decider during reconciliation (#95070)

The node-replacement allocation decider requires shard movements to
follow a specific route, from source to replacement target. However
during the shutdown there may be other changes in the system that make
the replacement target unsuitable for the final destination of the
shard. Having simulated the move of the shard onto the replacement
target we are free to simulate its movement elsewhere, but today this
causes the reconciler to get stuck: it cannot move the shard to its
desired location because of the ongoing replacement, and it will not
move the shard onto the replacement target because that's not its
desired location.

This commit weakens this decider during reconciliation which allows
the reconcilier to skip the intermediate target node and move the shard
straight to its desired location.
David Turner 2 жил өмнө
parent
commit
6d006f4b47

+ 5 - 0
docs/changelog/95070.yaml

@@ -0,0 +1,5 @@
+pr: 95070
+summary: Weaken node-replacement decider during reconciliation
+area: Allocation
+type: bug
+issues: []

+ 19 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java

@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
 import org.elasticsearch.cluster.routing.allocation.decider.Decision;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.Releasable;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.snapshots.RestoreService.RestoreInProgressUpdater;
 import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
@@ -64,6 +65,7 @@ public class RoutingAllocation {
 
     private final long currentNanoTime;
     private final boolean isSimulating;
+    private boolean isReconciling;
 
     private final IndexMetadataUpdater indexMetadataUpdater = new IndexMetadataUpdater();
     private final RoutingNodesChangedObserver nodesChangedObserver = new RoutingNodesChangedObserver();
@@ -401,6 +403,23 @@ public class RoutingAllocation {
         return isSimulating;
     }
 
+    /**
+     * @return {@code true} if this allocation computation is trying to reconcile towards a previously-computed allocation and therefore
+     *                      path-dependent allocation blockers should be ignored.
+     */
+    public boolean isReconciling() {
+        return isReconciling;
+    }
+
+    /**
+     * Set the {@link #isReconciling} flag, and return a {@link Releasable} which clears it again.
+     */
+    public Releasable withReconcilingFlag() {
+        assert isReconciling == false : "already reconciling";
+        isReconciling = true;
+        return () -> isReconciling = false;
+    }
+
     public void setSimulatedClusterInfo(ClusterInfo clusterInfo) {
         assert isSimulating : "Should be called only while simulating";
         this.clusterInfo = clusterInfo;

+ 26 - 24
server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java

@@ -60,37 +60,39 @@ public class DesiredBalanceReconciler {
     }
 
     void run() {
+        try (var ignored = allocation.withReconcilingFlag()) {
 
-        logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex());
+            logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex());
 
-        if (routingNodes.size() == 0) {
-            // no data nodes, so fail allocation to report red health
-            failAllocationOfNewPrimaries(allocation);
-            logger.trace("no nodes available, nothing to reconcile");
-            return;
-        }
+            if (routingNodes.size() == 0) {
+                // no data nodes, so fail allocation to report red health
+                failAllocationOfNewPrimaries(allocation);
+                logger.trace("no nodes available, nothing to reconcile");
+                return;
+            }
 
-        if (desiredBalance.assignments().isEmpty()) {
-            // no desired state yet but it is on its way and we'll reroute again when it is ready
-            logger.trace("desired balance is empty, nothing to reconcile");
-            return;
-        }
+            if (desiredBalance.assignments().isEmpty()) {
+                // no desired state yet but it is on its way and we'll reroute again when it is ready
+                logger.trace("desired balance is empty, nothing to reconcile");
+                return;
+            }
 
-        // compute next moves towards current desired balance:
+            // compute next moves towards current desired balance:
 
-        // 1. allocate unassigned shards first
-        logger.trace("Reconciler#allocateUnassigned");
-        allocateUnassigned();
-        assert allocateUnassignedInvariant();
+            // 1. allocate unassigned shards first
+            logger.trace("Reconciler#allocateUnassigned");
+            allocateUnassigned();
+            assert allocateUnassignedInvariant();
 
-        // 2. move any shards that cannot remain where they are
-        logger.trace("Reconciler#moveShards");
-        moveShards();
-        // 3. move any other shards that are desired elsewhere
-        logger.trace("Reconciler#balance");
-        balance();
+            // 2. move any shards that cannot remain where they are
+            logger.trace("Reconciler#moveShards");
+            moveShards();
+            // 3. move any other shards that are desired elsewhere
+            logger.trace("Reconciler#balance");
+            balance();
 
-        logger.debug("Reconciliation is complete");
+            logger.debug("Reconciliation is complete");
+        }
     }
 
     private boolean allocateUnassignedInvariant() {

+ 25 - 12
server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java

@@ -21,16 +21,20 @@ public class NodeReplacementAllocationDecider extends AllocationDecider {
 
     public static final String NAME = "node_replacement";
 
-    static final Decision NO_REPLACEMENTS = Decision.single(
+    static final Decision YES__RECONCILING = Decision.single(Decision.Type.YES, NAME, "this decider is ignored during reconciliation");
+
+    static final Decision YES__NO_REPLACEMENTS = Decision.single(Decision.Type.YES, NAME, "there are no ongoing node replacements");
+
+    static final Decision YES__NO_APPLICABLE_REPLACEMENTS = Decision.single(
         Decision.Type.YES,
         NAME,
-        "neither the source nor target node are part of an ongoing node replacement (no replacements)"
+        "none of the ongoing node replacements relate to the allocation of this shard"
     );
 
     @Override
     public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
         if (replacementOngoing(allocation) == false) {
-            return NO_REPLACEMENTS;
+            return YES__NO_REPLACEMENTS;
         } else if (replacementFromSourceToTarget(allocation, shardRouting.currentNodeId(), node.node().getName())) {
             return Decision.single(
                 Decision.Type.YES,
@@ -40,6 +44,12 @@ public class NodeReplacementAllocationDecider extends AllocationDecider {
                 node.nodeId()
             );
         } else if (isReplacementSource(allocation, shardRouting.currentNodeId())) {
+            if (allocation.isReconciling()) {
+                // We permit moving shards off the source node during reconcilation so that they can go onto their desired nodes even if
+                // the desired node is different from the replacement target.
+                return YES__RECONCILING;
+            }
+
             return Decision.single(
                 Decision.Type.NO,
                 NAME,
@@ -57,6 +67,13 @@ public class NodeReplacementAllocationDecider extends AllocationDecider {
                 shardRouting.currentNodeId()
             );
         } else if (isReplacementTargetName(allocation, node.node().getName())) {
+            if (allocation.isReconciling() && shardRouting.unassigned() == false) {
+                // We permit moving _existing_ shards onto the target during reconcilation so that they stay out of the way of other shards
+                // moving off the source node. But we don't allow any unassigned shards to be assigned to the target since this could
+                // prevent the node from being vacated.
+                return YES__RECONCILING;
+            }
+
             final SingleNodeShutdownMetadata shutdown = allocation.replacementTargetShutdowns().get(node.node().getName());
             return Decision.single(
                 Decision.Type.NO,
@@ -68,14 +85,14 @@ public class NodeReplacementAllocationDecider extends AllocationDecider {
                 shardRouting.currentNodeId()
             );
         } else {
-            return Decision.single(Decision.Type.YES, NAME, "neither the source nor target node are part of an ongoing node replacement");
+            return YES__NO_APPLICABLE_REPLACEMENTS;
         }
     }
 
     @Override
     public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
         if (replacementOngoing(allocation) == false) {
-            return NO_REPLACEMENTS;
+            return YES__NO_REPLACEMENTS;
         } else if (isReplacementSource(allocation, node.nodeId())) {
             return Decision.single(
                 Decision.Type.NO,
@@ -85,14 +102,14 @@ public class NodeReplacementAllocationDecider extends AllocationDecider {
                 getReplacementName(allocation, node.nodeId())
             );
         } else {
-            return Decision.single(Decision.Type.YES, NAME, "node [%s] is not being replaced", node.nodeId());
+            return YES__NO_APPLICABLE_REPLACEMENTS;
         }
     }
 
     @Override
     public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
         if (replacementOngoing(allocation) == false) {
-            return NO_REPLACEMENTS;
+            return YES__NO_REPLACEMENTS;
         } else if (isReplacementTargetName(allocation, node.getName())) {
             final SingleNodeShutdownMetadata shutdown = allocation.replacementTargetShutdowns().get(node.getName());
             return Decision.single(
@@ -112,11 +129,7 @@ public class NodeReplacementAllocationDecider extends AllocationDecider {
                 getReplacementName(allocation, node.getId())
             );
         } else {
-            return Decision.single(
-                Decision.Type.YES,
-                NAME,
-                "node is not part of a node replacement, so shards may be auto expanded onto it"
-            );
+            return YES__NO_APPLICABLE_REPLACEMENTS;
         }
     }
 

+ 6 - 7
server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDeciderTests.java

@@ -40,7 +40,6 @@ import java.util.Collections;
 import java.util.HashMap;
 
 import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
 public class NodeReplacementAllocationDeciderTests extends ESAllocationTestCase {
@@ -55,7 +54,7 @@ public class NodeReplacementAllocationDeciderTests extends ESAllocationTestCase
         ShardRouting.Role.DEFAULT
     );
     private final ClusterSettings clusterSettings = createBuiltInClusterSettings();
-    private NodeReplacementAllocationDecider decider = new NodeReplacementAllocationDecider();
+    private final NodeReplacementAllocationDecider decider = new NodeReplacementAllocationDecider();
     private final AllocationDeciders allocationDeciders = new AllocationDeciders(
         Arrays.asList(
             decider,
@@ -98,11 +97,11 @@ public class NodeReplacementAllocationDeciderTests extends ESAllocationTestCase
 
         Decision decision = decider.canAllocate(shard, routingNode, allocation);
         assertThat(decision.type(), equalTo(Decision.Type.YES));
-        assertThat(decision.getExplanation(), equalTo(NodeReplacementAllocationDecider.NO_REPLACEMENTS.getExplanation()));
+        assertThat(decision.getExplanation(), equalTo(NodeReplacementAllocationDecider.YES__NO_REPLACEMENTS.getExplanation()));
 
         decision = decider.canRemain(null, shard, routingNode, allocation);
         assertThat(decision.type(), equalTo(Decision.Type.YES));
-        assertThat(decision.getExplanation(), equalTo(NodeReplacementAllocationDecider.NO_REPLACEMENTS.getExplanation()));
+        assertThat(decision.getExplanation(), equalTo(NodeReplacementAllocationDecider.YES__NO_REPLACEMENTS.getExplanation()));
     }
 
     public void testCanForceAllocate() {
@@ -174,13 +173,13 @@ public class NodeReplacementAllocationDeciderTests extends ESAllocationTestCase
 
         decision = decider.canRemain(indexMetadata, shard, routingNode, allocation);
         assertThat(decision.type(), equalTo(Decision.Type.YES));
-        assertThat(decision.getExplanation(), equalTo("node [" + NODE_B.getId() + "] is not being replaced"));
+        assertEquals(NodeReplacementAllocationDecider.YES__NO_APPLICABLE_REPLACEMENTS, decision);
 
         routingNode = RoutingNodesHelper.routingNode(NODE_C.getId(), NODE_C, shard);
 
         decision = decider.canRemain(indexMetadata, shard, routingNode, allocation);
         assertThat(decision.type(), equalTo(Decision.Type.YES));
-        assertThat(decision.getExplanation(), equalTo("node [" + NODE_C.getId() + "] is not being replaced"));
+        assertEquals(NodeReplacementAllocationDecider.YES__NO_APPLICABLE_REPLACEMENTS, decision);
     }
 
     public void testCanAllocateToNeitherSourceNorTarget() {
@@ -225,7 +224,7 @@ public class NodeReplacementAllocationDeciderTests extends ESAllocationTestCase
 
         decision = decider.canAllocate(testShard, routingNode, allocation);
         assertThat(decision.getExplanation(), decision.type(), equalTo(Decision.Type.YES));
-        assertThat(decision.getExplanation(), containsString("neither the source nor target node are part of an ongoing node replacement"));
+        assertEquals(NodeReplacementAllocationDecider.YES__NO_APPLICABLE_REPLACEMENTS, decision);
     }
 
     private ClusterState prepareState(ClusterState initialState, String sourceNodeId, String targetNodeName) {

+ 2 - 3
x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/DesiredBalanceShutdownIT.java

@@ -41,7 +41,7 @@ public class DesiredBalanceShutdownIT extends ESIntegTestCase {
         createIndex(
             INDEX,
             Settings.builder()
-                .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
                 .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
                 .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", oldNodeName)
                 .build()
@@ -89,7 +89,6 @@ public class DesiredBalanceShutdownIT extends ESIntegTestCase {
                     .stream()
                     .allMatch(s -> s.overallStatus() == SingleNodeShutdownMetadata.Status.COMPLETE)
             );
-        });
+        }, 120, TimeUnit.SECONDS);
     }
-
 }