Browse Source

Allow re-allocation of replica shards on nodes during shutdown replacement (#79171)

This commit allows replica shards that have existing data on disk to be re-allocated to the target
of a "REPLACE" type node shutdown. Prior to this if the target node of a shutdown were to restart,
the replicas would not be allowed to be allocated even if their data existed on disk.

Relates to #70338 as a follow-up to #76247
Lee Hinman 4 years ago
parent
commit
dbb1e49e56

+ 15 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java

@@ -122,4 +122,19 @@ public abstract class AllocationDecider {
     public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
         return Decision.YES;
     }
+
+    /**
+     * Returns a {@link Decision} whether the given replica shard can be
+     * allocated to the given node when there is an existing retention lease
+     * already existing on the node (meaning it has been allocated there previously)
+     *
+     * This method does not actually check whether there is a retention lease,
+     * that is the responsibility of the caller.
+     *
+     * It defaults to the same value as {@code canAllocate}.
+     */
+    public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node,
+                                                                RoutingAllocation allocation) {
+        return canAllocate(shardRouting, node, allocation);
+    }
 }

+ 22 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java

@@ -231,6 +231,28 @@ public class AllocationDeciders extends AllocationDecider {
         return ret;
     }
 
+    @Override
+    public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
+        if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
+            return Decision.NO;
+        }
+        Decision.Multi ret = new Decision.Multi();
+        for (AllocationDecider allocationDecider : allocations) {
+            Decision decision = allocationDecider.canAllocateReplicaWhenThereIsRetentionLease(shardRouting, node, allocation);
+            // short track if a NO is returned.
+            if (decision.type() == Decision.Type.NO) {
+                if (allocation.debugDecision() == false) {
+                    return Decision.NO;
+                } else {
+                    ret.add(decision);
+                }
+            } else {
+                addDecision(ret, decision, allocation);
+            }
+        }
+        return ret;
+    }
+
     private void addDecision(Decision.Multi ret, Decision decision, RoutingAllocation allocation) {
         // We never add ALWAYS decisions and only add YES decisions when requested by debug mode (since Multi default is YES).
         if (decision != Decision.ALWAYS

+ 11 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java

@@ -97,6 +97,17 @@ public class NodeReplacementAllocationDecider extends AllocationDecider {
         }
     }
 
+    @Override
+    public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
+        if (isReplacementTargetName(allocation, node.node().getName())) {
+            return Decision.single(Decision.Type.YES, NAME,
+                "node [%s] is a node replacement target and can have a previously allocated replica re-allocated to it",
+                node.nodeId());
+        } else {
+            return canAllocate(shardRouting, node, allocation);
+        }
+    }
+
     /**
      * Returns true if there are any node replacements ongoing in the cluster
      */

+ 16 - 6
server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java

@@ -188,7 +188,8 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
         } else if (matchingNodes.getNodeWithHighestMatch() != null) {
             RoutingNode nodeWithHighestMatch = allocation.routingNodes().node(matchingNodes.getNodeWithHighestMatch().getId());
             // we only check on THROTTLE since we checked before on NO
-            Decision decision = allocation.deciders().canAllocate(unassignedShard, nodeWithHighestMatch, allocation);
+            Decision decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(unassignedShard,
+                nodeWithHighestMatch, allocation);
             if (decision.type() == Decision.Type.THROTTLE) {
                 logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store",
                     unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeWithHighestMatch.node());
@@ -245,7 +246,7 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
             }
             // if we can't allocate it on a node, ignore it, for example, this handles
             // cases for only allocating a replica after a primary
-            Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
+            Decision decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(shard, node, allocation);
             if (decision.type() == Decision.Type.YES && madeDecision.type() != Decision.Type.YES) {
                 if (explain) {
                     madeDecision = decision;
@@ -317,10 +318,17 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
                 continue;
             }
 
-            // check if we can allocate on that node...
-            // we only check for NO, since if this node is THROTTLING and it has enough "same data"
-            // then we will try and assign it next time
-            Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
+            // Check whether we have existing data for the replica
+            final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(discoNode);
+            final Decision decision;
+            if (retainingSeqNoForReplica == -1) {
+                // There is no existing replica data on the node
+                decision = allocation.deciders().canAllocate(shard, node, allocation);
+            } else {
+                // There is existing replica data on the node
+                decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(shard, node, allocation);
+            }
+
             MatchingNode matchingNode = null;
             if (explain) {
                 matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetadata);
@@ -328,6 +336,8 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
                 nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision));
             }
 
+            // we only check for NO, since if this node is THROTTLING and it has enough "same data"
+            // then we will try and assign it next time
             if (decision.type() == Decision.Type.NO) {
                 continue;
             }

+ 42 - 0
x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java

@@ -439,6 +439,48 @@ public class NodeShutdownShardsIT extends ESIntegTestCase {
         });
     }
 
+    public void testReallocationForReplicaDuringNodeReplace() throws Exception {
+        final String nodeA = internalCluster().startNode();
+        final String nodeAId = getNodeId(nodeA);
+        createIndex("myindex", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
+        ensureYellow("myindex");
+
+        // Start a second node, so the replica will be on nodeB
+        final String nodeB = internalCluster().startNode();
+        ensureGreen("myindex");
+
+        final String nodeC = internalCluster().startNode();
+
+        // Register a replace for nodeA, with nodeC as the target
+        PutShutdownNodeAction.Request shutdownRequest = new PutShutdownNodeAction.Request(
+            nodeAId,
+            SingleNodeShutdownMetadata.Type.REPLACE,
+            "testing",
+            null,
+            nodeC
+        );
+        client().execute(PutShutdownNodeAction.INSTANCE, shutdownRequest).get();
+
+        // Wait for the node replace shutdown to be complete
+        assertBusy(() -> {
+            GetShutdownStatusAction.Response shutdownStatus = client().execute(
+                GetShutdownStatusAction.INSTANCE,
+                new GetShutdownStatusAction.Request(nodeAId)
+            ).get();
+            assertThat(shutdownStatus.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE));
+        });
+
+        // Remove nodeA from the cluster (it's been terminated)
+        internalCluster().stopNode(nodeA);
+
+        // Restart nodeC, the replica on nodeB will be flipped to primary and
+        // when nodeC comes back up, it should have the replica assigned to it
+        internalCluster().restartNode(nodeC);
+
+        // All shards for the index should be allocated
+        ensureGreen("myindex");
+    }
+
     private void indexRandomData() throws Exception {
         int numDocs = scaledRandomIntBetween(100, 1000);
         IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];