Kaynağa Gözat

Better handling of node ids from shutdown metadata (#128298)

Currently, the DiskThresholdMonitor code considers shutdown metadata to identify nodes that are being replaced. If the node-to-be-replaced (source) leaves the cluster before the corresponding shutdown metadata is removed from the cluster state, we can have a NPE. This PR adds a test for that and improves a bit code to handle node ids for source and target from shutdown metadata.

Fixes #100201
Lorenzo Dematté 5 ay önce
ebeveyn
işleme
e88bcaf4da

+ 6 - 0
docs/changelog/128298.yaml

@@ -0,0 +1,6 @@
+pr: 128298
+summary: Better handling of node ids from shutdown metadata (avoid NPE on already removed nodes)
+area: Infra/Node Lifecycle
+type: bug
+issues:
+ - 100201

+ 12 - 5
server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

@@ -49,7 +49,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -376,9 +375,12 @@ public class DiskThresholdMonitor {
             }
 
             // Generate a map of node name to ID so we can use it to look up node replacement targets
-            final Map<String, String> nodeNameToId = state.getRoutingNodes()
+            final Map<String, List<String>> nodeNameToIds = state.getRoutingNodes()
                 .stream()
-                .collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));
+                .collect(Collectors.groupingBy(rn -> rn.node().getName(), Collectors.mapping(RoutingNode::nodeId, Collectors.toList())));
+
+            // Generate a set of the valid node IDs so we can use it to filter valid sources
+            final Set<String> routingNodeIds = state.getRoutingNodes().stream().map(RoutingNode::nodeId).collect(Collectors.toSet());
 
             // Calculate both the source node id and the target node id of a "replace" type shutdown
             final Set<String> nodesIdsPartOfReplacement = state.metadata()
@@ -387,8 +389,8 @@ public class DiskThresholdMonitor {
                 .values()
                 .stream()
                 .filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE)
-                .flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName())))
-                .filter(Objects::nonNull)  // The REPLACE target node might not still be in RoutingNodes
+                .flatMap(meta -> Stream.concat(Stream.of(meta.getNodeId()), nodeIdsOrEmpty(meta, nodeNameToIds)))
+                .filter(routingNodeIds::contains) // The REPLACE source node might already have been removed from RoutingNodes
                 .collect(Collectors.toSet());
 
             // Generate a set of all the indices that exist on either the target or source of a node replacement
@@ -437,6 +439,11 @@ public class DiskThresholdMonitor {
         }
     }
 
+    private static Stream<String> nodeIdsOrEmpty(SingleNodeShutdownMetadata meta, Map<String, List<String>> nodeNameToIds) {
+        var ids = nodeNameToIds.get(meta.getTargetNodeName()); // The REPLACE target node might not still be in RoutingNodes
+        return ids == null ? Stream.empty() : ids.stream();
+    }
+
     // exposed for tests to override
     long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) {
         return DiskThresholdDecider.sizeOfUnaccountedShards(

+ 70 - 0
server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java

@@ -1409,6 +1409,76 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
         assertNull(result2.v2());
     }
 
+    private void doTestSkipNodesNotInRoutingTable(boolean sourceNodeInTable, boolean targetNodeInTable) {
+        final var projectId = randomProjectIdOrDefault();
+        final Metadata.Builder metadataBuilder = Metadata.builder()
+            .put(
+                ProjectMetadata.builder(projectId)
+                    .put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(1).numberOfReplicas(1))
+                    .build()
+            );
+
+        metadataBuilder.putCustom(
+            NodesShutdownMetadata.TYPE,
+            new NodesShutdownMetadata(
+                Collections.singletonMap(
+                    "node1",
+                    SingleNodeShutdownMetadata.builder()
+                        .setNodeId("node1")
+                        .setNodeEphemeralId("node1")
+                        .setReason("testing")
+                        .setType(SingleNodeShutdownMetadata.Type.REPLACE)
+                        .setTargetNodeName("node3")
+                        .setStartedAtMillis(randomNonNegativeLong())
+                        .build()
+                )
+            )
+        );
+
+        final Metadata metadata = metadataBuilder.build();
+        final RoutingTable routingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY)
+            .addAsNew(metadata.getProject(projectId).index("test"))
+            .build();
+        DiscoveryNodes.Builder discoveryNodes = DiscoveryNodes.builder().add(newNormalNode("node2", "node2"));
+        // node1 which is replaced by node3 may or may not be in the cluster
+        if (sourceNodeInTable) {
+            discoveryNodes.add(newNormalNode("node1", "node1"));
+        }
+        // node3 which is to replace node1 may or may not be in the cluster
+        if (targetNodeInTable) {
+            discoveryNodes.add(newNormalNode("node3", "node3"));
+        }
+        final ClusterState clusterState = applyStartedShardsUntilNoChange(
+            ClusterState.builder(ClusterName.DEFAULT)
+                .metadata(metadata)
+                .routingTable(GlobalRoutingTable.builder().put(projectId, routingTable).build())
+                .nodes(discoveryNodes)
+                .build(),
+            createAllocationService(Settings.EMPTY)
+        );
+        final Index testIndex = routingTable.index("test").getIndex();
+
+        Map<String, DiskUsage> diskUsages = new HashMap<>();
+        diskUsages.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
+        diskUsages.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
+        final ClusterInfo clusterInfo = clusterInfo(diskUsages);
+        Tuple<Boolean, Set<Index>> result = runDiskThresholdMonitor(clusterState, clusterInfo);
+        assertTrue(result.v1()); // reroute on new nodes
+        assertThat(result.v2(), contains(testIndex));
+    }
+
+    public void testSkipReplaceSourceNodeNotInRoutingTable() {
+        doTestSkipNodesNotInRoutingTable(false, true);
+    }
+
+    public void testSkipReplaceTargetNodeNotInRoutingTable() {
+        doTestSkipNodesNotInRoutingTable(true, false);
+    }
+
+    public void testSkipReplaceSourceAndTargetNodesNotInRoutingTable() {
+        doTestSkipNodesNotInRoutingTable(false, false);
+    }
+
     // Runs a disk threshold monitor with a given cluster state and cluster info and returns whether a reroute should
     // happen and any indices that should be marked as read-only.
     private Tuple<Boolean, Set<Index>> runDiskThresholdMonitor(ClusterState clusterState, ClusterInfo clusterInfo) {