瀏覽代碼

Check for unassigned shards on node shutdown (#91297)

Co-authored-by: Ievgen Degtiarenko <ievgen.degtiarenko@gmail.com>
Nikola Grcevski 2 年之前
父節點
當前提交
dd4fe36cb2

+ 6 - 0
docs/changelog/91297.yaml

@@ -0,0 +1,6 @@
+pr: 91297
+summary: Check for unassigned shards on node shutdown
+area: Infra/Core
+type: enhancement
+issues:
+ - 88635

+ 18 - 0
test/framework/src/main/java/org/elasticsearch/test/gateway/TestGatewayAllocator.java

@@ -8,9 +8,12 @@
 
 package org.elasticsearch.test.gateway;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
 import org.elasticsearch.cluster.routing.allocation.FailedShard;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
 import org.elasticsearch.gateway.AsyncShardFetch;
@@ -45,6 +48,8 @@ import java.util.stream.Collectors;
  */
 public class TestGatewayAllocator extends GatewayAllocator {
 
+    private static final Logger LOGGER = LogManager.getLogger(TestGatewayAllocator.class);
+
     Map<String /* node id */, Map<ShardId, ShardRouting>> knownAllocations = new HashMap<>();
     DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES;
 
@@ -126,6 +131,19 @@ public class TestGatewayAllocator extends GatewayAllocator {
         innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler);
     }
 
+    @Override
+    public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) {
+        assert unassignedShard.unassigned();
+        assert routingAllocation.debugDecision();
+        if (unassignedShard.primary()) {
+            assert primaryShardAllocator != null;
+            return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, LOGGER);
+        } else {
+            assert replicaShardAllocator != null;
+            return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, LOGGER);
+        }
+    }
+
     /**
      * manually add a specific shard to the allocations the gateway keeps track of
      */

+ 36 - 0
x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java

@@ -11,6 +11,7 @@ import org.elasticsearch.Build;
 import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
 import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.cluster.ClusterState;
@@ -410,6 +411,41 @@ public class NodeShutdownShardsIT extends ESIntegTestCase {
         ensureGreen("myindex");
     }
 
+    public void testNodeShutdownWithUnassignedShards() throws Exception {
+        final String nodeA = internalCluster().startNode();
+        final String nodeAId = getNodeId(nodeA);
+
+        createIndex(
+            "index",
+            Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
+        );
+
+        ensureYellow("index");
+
+        // Start a second node, so the replica will be on nodeB
+        final String nodeB = internalCluster().startNode();
+        final String nodeBId = getNodeId(nodeB);
+        ensureGreen("index");
+
+        client().admin()
+            .cluster()
+            .updateSettings(
+                new ClusterUpdateSettingsRequest().persistentSettings(Settings.builder().put("cluster.routing.allocation.enable", "none"))
+            );
+
+        assertThat(client().admin().indices().prepareFlush("index").get().getSuccessfulShards(), equalTo(2));
+        assertThat(client().admin().indices().prepareRefresh("index").get().getSuccessfulShards(), equalTo(2));
+
+        internalCluster().restartNode(nodeA);
+        internalCluster().restartNode(nodeB);
+
+        assertThat(client().admin().cluster().prepareHealth("index").get().getUnassignedShards(), equalTo(1));
+
+        putNodeShutdown(nodeAId, SingleNodeShutdownMetadata.Type.REMOVE, null);
+
+        assertBusy(() -> assertNodeShutdownStatus(nodeAId, STALLED));
+    }
+
     private void indexRandomData(String index) throws Exception {
         int numDocs = scaledRandomIntBetween(100, 1000);
         IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];

+ 57 - 21
x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java

@@ -41,13 +41,16 @@ import org.elasticsearch.transport.TransportService;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.elasticsearch.cluster.metadata.ShutdownShardMigrationStatus.NODE_ALLOCATION_DECISION_KEY;
 import static org.elasticsearch.core.Strings.format;
@@ -195,7 +198,48 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction<
             return new ShutdownShardMigrationStatus(SingleNodeShutdownMetadata.Status.COMPLETE, 0);
         }
 
-        // First, check if there are any shards currently on this node, and if there are any relocating shards
+        final RoutingAllocation allocation = new RoutingAllocation(
+            allocationDeciders,
+            currentState,
+            clusterInfoService.getClusterInfo(),
+            snapshotsInfoService.snapshotShardSizes(),
+            System.nanoTime()
+        );
+        allocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS);
+
+        // We also need the set of node IDs which are currently shutting down.
+        Set<String> shuttingDownNodes = currentState.metadata().nodeShutdowns().keySet();
+
+        // Check if we have any unassigned primary shards that have this nodeId as their lastAllocatedNodeId
+        var unassignedShards = Stream.iterate(
+            currentState.getRoutingNodes().unassigned().iterator(),
+            Iterator::hasNext,
+            UnaryOperator.identity()
+        )
+            .map(Iterator::next)
+            .filter(s -> Objects.equals(s.unassignedInfo().getLastAllocatedNodeId(), nodeId))
+            .filter(s -> s.primary() || hasShardCopyOnAnotherNode(currentState, s, shuttingDownNodes) == false)
+            .toList();
+
+        if (unassignedShards.isEmpty() == false) {
+            var shardRouting = unassignedShards.get(0);
+            ShardAllocationDecision decision = allocationService.explainShardAllocation(shardRouting, allocation);
+
+            return new ShutdownShardMigrationStatus(
+                SingleNodeShutdownMetadata.Status.STALLED,
+                unassignedShards.size(),
+                format(
+                    "shard [%s] [%s] of index [%s] is unassigned, see [%s] for details or use the cluster allocation explain API",
+                    shardRouting.shardId().getId(),
+                    shardRouting.primary() ? "primary" : "replica",
+                    shardRouting.index().getName(),
+                    NODE_ALLOCATION_DECISION_KEY
+                ),
+                decision
+            );
+        }
+
+        // Check if there are any shards currently on this node, and if there are any relocating shards
         int startedShards = currentState.getRoutingNodes().node(nodeId).numberOfShardsWithState(ShardRoutingState.STARTED);
         int relocatingShards = currentState.getRoutingNodes().node(nodeId).numberOfShardsWithState(ShardRoutingState.RELOCATING);
         int initializingShards = currentState.getRoutingNodes().node(nodeId).numberOfShardsWithState(ShardRoutingState.INITIALIZING);
@@ -217,18 +261,6 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction<
         }
 
         // If there's no relocating shards and shards still on this node, we need to figure out why
-        final RoutingAllocation allocation = new RoutingAllocation(
-            allocationDeciders,
-            currentState,
-            clusterInfoService.getClusterInfo(),
-            snapshotsInfoService.snapshotShardSizes(),
-            System.nanoTime()
-        );
-        allocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS);
-
-        // We also need the set of node IDs which are currently shutting down.
-        Set<String> shuttingDownNodes = currentState.metadata().nodeShutdowns().keySet();
-
         AtomicInteger shardsToIgnoreForFinalStatus = new AtomicInteger(0);
 
         // Explain shard allocations until we find one that can't move, then stop (as `findFirst` short-circuits)
@@ -249,14 +281,7 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction<
             .filter(pair -> pair.v2().getMoveDecision().getAllocationDecision().equals(AllocationDecision.YES) == false)
             // If the shard that can't move is on every node in the cluster, we shouldn't be `STALLED` on it.
             .filter(pair -> {
-                final boolean hasShardCopyOnOtherNode = currentState.routingTable()
-                    .allShards(pair.v1().index().getName())
-                    .stream()
-                    .filter(shardRouting -> shardRouting.id() == pair.v1().id())
-                    // If any shards are both 1) `STARTED` and 2) are not on a node that's shutting down, we have at least one copy
-                    // of this shard safely on a node that's not shutting down, so we don't want to report `STALLED` because of this shard.
-                    .filter(ShardRouting::started)
-                    .anyMatch(routing -> shuttingDownNodes.contains(routing.currentNodeId()) == false);
+                final boolean hasShardCopyOnOtherNode = hasShardCopyOnAnotherNode(currentState, pair.v1(), shuttingDownNodes);
                 if (hasShardCopyOnOtherNode) {
                     shardsToIgnoreForFinalStatus.incrementAndGet();
                 }
@@ -305,6 +330,17 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction<
         }
     }
 
+    private static boolean hasShardCopyOnAnotherNode(ClusterState clusterState, ShardRouting shardRouting, Set<String> shuttingDownNodes) {
+        return clusterState.routingTable()
+            .allShards(shardRouting.index().getName())
+            .stream()
+            .filter(sr -> sr.id() == shardRouting.id())
+            // If any shards are both 1) `STARTED` and 2) are not on a node that's shutting down, we have at least one copy
+            // of this shard safely on a node that's not shutting down, so we don't want to report `STALLED` because of this shard.
+            .filter(ShardRouting::started)
+            .anyMatch(routing -> shuttingDownNodes.contains(routing.currentNodeId()) == false);
+    }
+
     @Override
     protected ClusterBlockException checkBlock(GetShutdownStatusAction.Request request, ClusterState state) {
         return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

+ 120 - 0
x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
 import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
@@ -34,12 +35,14 @@ import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAlloc
 import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.gateway.GatewayAllocator;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
 import org.elasticsearch.snapshots.SnapshotsInfoService;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.gateway.TestGatewayAllocator;
 import org.hamcrest.Matcher;
 import org.junit.Before;
 
@@ -47,6 +50,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.Matchers.allOf;
@@ -54,6 +58,9 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 public class TransportGetShutdownStatusActionTests extends ESTestCase {
     public static final String SHUTTING_DOWN_NODE_ID = "node1";
@@ -122,6 +129,7 @@ public class TransportGetShutdownStatusActionTests extends ESTestCase {
             clusterInfoService,
             snapshotsInfoService
         );
+        allocationService.setExistingShardsAllocators(Map.of(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator()));
     }
 
     /**
@@ -349,6 +357,60 @@ public class TransportGetShutdownStatusActionTests extends ESTestCase {
         );
     }
 
+    /**
+     * Ensure we can detect stalled migrations when we have unassigned shards that had the shutting down node as their last known
+     * node id
+     */
+    public void testStalledUnassigned() {
+        Index index = new Index(randomAlphaOfLength(5), randomAlphaOfLengthBetween(1, 20));
+        IndexMetadata imd = spy(generateIndexMetadata(index, 3, 0));
+        // make sure the TestGatewayAllocator stays in sync always, avoid flaky tests
+        doAnswer(i -> {
+            if ((Integer) i.getArgument(0) < 2) {
+                return Set.of(LIVE_NODE_ID);
+            }
+            return Set.of(SHUTTING_DOWN_NODE_ID);
+        }).when(imd).inSyncAllocationIds(anyInt());
+
+        var shard0 = TestShardRouting.newShardRouting(new ShardId(index, 0), LIVE_NODE_ID, true, ShardRoutingState.STARTED);
+        var shard1 = TestShardRouting.newShardRouting(new ShardId(index, 1), LIVE_NODE_ID, true, ShardRoutingState.STARTED);
+
+        // we should stall the node if we find an unassigned shard with lastAllocatedNodeId matching the shutting down node
+        var unassigned = makeUnassignedShard(index, 2, SHUTTING_DOWN_NODE_ID, true);
+
+        assertShardMigration(
+            getUnassignedShutdownStatus(index, imd, shard0, shard1, unassigned),
+            SingleNodeShutdownMetadata.Status.STALLED,
+            1,
+            allOf(containsString(index.getName()), containsString("[2] [primary]"))
+        );
+
+        // if the shard is unassigned, but it's not a primary on this node, we shouldn't stall
+        var shard2 = TestShardRouting.newShardRouting(new ShardId(index, 2), LIVE_NODE_ID, true, ShardRoutingState.STARTED);
+        var unassignedReplica = makeUnassignedShard(index, 2, SHUTTING_DOWN_NODE_ID, false);
+
+        var s = getUnassignedShutdownStatus(index, imd, shard0, shard1, shard2, unassignedReplica);
+        assertShardMigration(s, SingleNodeShutdownMetadata.Status.COMPLETE, 0, nullValue());
+
+        // check if we correctly count all of the unassigned shards
+        var unassigned3 = makeUnassignedShard(index, 3, SHUTTING_DOWN_NODE_ID, true);
+
+        assertShardMigration(
+            getUnassignedShutdownStatus(index, imd, shard0, shard1, unassigned3, unassigned),
+            SingleNodeShutdownMetadata.Status.STALLED,
+            2,
+            allOf(containsString(index.getName()), containsString("[2] [primary]"))
+        );
+
+        // check if we correctly walk all of the unassigned shards, shard 2 replica, shard 3 primary
+        assertShardMigration(
+            getUnassignedShutdownStatus(index, imd, shard0, shard1, shard2, unassignedReplica, unassigned3),
+            SingleNodeShutdownMetadata.Status.STALLED,
+            1,
+            allOf(containsString(index.getName()), containsString("[3] [primary]"))
+        );
+    }
+
     public void testNotStalledIfAllShardsHaveACopyOnAnotherNode() {
         Index index = new Index(randomAlphaOfLength(5), randomAlphaOfLengthBetween(1, 20));
         IndexMetadata imd = generateIndexMetadata(index, 3, 0);
@@ -571,4 +633,62 @@ public class TransportGetShutdownStatusActionTests extends ESTestCase {
             .routingTable(indexRoutingTable)
             .build();
     }
+
+    private UnassignedInfo makeUnassignedInfo(String nodeId) {
+        return new UnassignedInfo(
+            UnassignedInfo.Reason.ALLOCATION_FAILED,
+            "testing",
+            null,
+            1,
+            System.nanoTime(),
+            System.currentTimeMillis(),
+            false,
+            UnassignedInfo.AllocationStatus.NO_ATTEMPT,
+            Collections.emptySet(),
+            nodeId
+        );
+    }
+
+    private ShardRouting makeUnassignedShard(Index index, int shardId, String nodeId, boolean primary) {
+        var unsignedInfo = makeUnassignedInfo(nodeId);
+
+        return TestShardRouting.newShardRouting(
+            new ShardId(index, shardId),
+            null,
+            null,
+            primary,
+            ShardRoutingState.UNASSIGNED,
+            unsignedInfo
+        );
+    }
+
+    private ShutdownShardMigrationStatus getUnassignedShutdownStatus(Index index, IndexMetadata imd, ShardRouting... shards) {
+        var indexRoutingTableBuilder = IndexRoutingTable.builder(index);
+
+        for (var routing : shards) {
+            indexRoutingTableBuilder.addShard(routing);
+        }
+
+        var indexRoutingTable = indexRoutingTableBuilder.build();
+
+        // Force a decision of NO for all moves and new allocations, simulating a decider that's stuck
+        canAllocate.set((r, n, a) -> Decision.NO);
+        // And the remain decider simulates NodeShutdownAllocationDecider
+        canRemain.set((r, n, a) -> n.nodeId().equals(SHUTTING_DOWN_NODE_ID) ? Decision.NO : Decision.YES);
+
+        RoutingTable.Builder routingTable = RoutingTable.builder();
+        routingTable.add(indexRoutingTable);
+        ClusterState state = createTestClusterState(routingTable.build(), List.of(imd), SingleNodeShutdownMetadata.Type.REMOVE);
+
+        return TransportGetShutdownStatusAction.shardMigrationStatus(
+            state,
+            SHUTTING_DOWN_NODE_ID,
+            SingleNodeShutdownMetadata.Type.REMOVE,
+            true,
+            clusterInfoService,
+            snapshotsInfoService,
+            allocationService,
+            allocationDeciders
+        );
+    }
 }