소스 검색

Avoid NPE when disassociateDeadNodes is executed for a node present in the desired balance (#91659)

Ievgen Degtiarenko 2 년 전
부모
커밋
4a392e2952

+ 6 - 0
docs/changelog/91659.yaml

@@ -0,0 +1,6 @@
+pr: 91659
+summary: Avoid NPE when disassociateDeadNodes is executed for a node present in the
+  desired balance
+area: Allocation
+type: bug
+issues: []

+ 13 - 7
server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java

@@ -415,13 +415,17 @@ public class DesiredBalanceReconciler {
     ) {
         for (final var nodeId : desiredNodeIds) {
             // TODO consider ignored nodes here too?
-            if (nodeId.equals(shardRouting.currentNodeId()) == false) {
-                final var currentNode = routingNodes.node(nodeId);
-                final var decision = canAllocateDecider.apply(shardRouting, currentNode);
-                logger.trace("relocate {} to {}: {}", shardRouting, nodeId, decision);
-                if (decision.type() == Decision.Type.YES) {
-                    return currentNode.node();
-                }
+            if (nodeId.equals(shardRouting.currentNodeId())) {
+                continue;
+            }
+            final var node = routingNodes.node(nodeId);
+            if (node == null) { // node left the cluster while reconciliation is still in progress
+                continue;
+            }
+            final var decision = canAllocateDecider.apply(shardRouting, node);
+            logger.trace("relocate {} to {}: {}", shardRouting, nodeId, decision);
+            if (decision.type() == Decision.Type.YES) {
+                return node.node();
             }
         }
 
@@ -429,10 +433,12 @@ public class DesiredBalanceReconciler {
     }
 
     private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) {
+        assert target != null : "Target node is not found";
         return allocation.deciders().canAllocate(shardRouting, target, allocation);
     }
 
     private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, RoutingNode target) {
+        assert target != null : "Target node is not found";
         return allocation.deciders().canForceAllocateDuringReplace(shardRouting, target, allocation);
     }
 }

+ 69 - 24
server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java

@@ -21,6 +21,7 @@ import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.RoutingChangesObserver;
@@ -78,25 +79,22 @@ import static org.elasticsearch.cluster.ESAllocationTestCase.startInitializingSh
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_VERSION_CREATED;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
+import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
 import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING;
 import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING;
 import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.oneOf;
 
 public class DesiredBalanceReconcilerTests extends ESTestCase {
 
     public void testNoChangesOnEmptyDesiredBalance() {
         final var clusterState = DesiredBalanceComputerTests.createInitialClusterState(3);
-        final var routingAllocation = new RoutingAllocation(
-            new AllocationDeciders(List.of()),
-            clusterState.mutableRoutingNodes(),
-            clusterState,
-            ClusterInfo.EMPTY,
-            SnapshotShardSizeInfo.EMPTY,
-            0L
-        );
+        final var routingAllocation = createRoutingAllocationFrom(clusterState);
 
         reconcile(routingAllocation, new DesiredBalance(1, Map.of()));
         assertFalse(routingAllocation.routingNodesChanged());
@@ -319,10 +317,6 @@ public class DesiredBalanceReconcilerTests extends ESTestCase {
 
         final var stateWithInitializingPrimaries = startInitializingShardsAndReroute(allocationService, clusterState);
         for (final var indexRoutingTable : stateWithInitializingPrimaries.routingTable()) {
-            for (int i = 0; i < indexRoutingTable.size(); i++) {
-                final var indexShardRoutingTable = indexRoutingTable.shard(i);
-            }
-
             for (int i = 0; i < indexRoutingTable.size(); i++) {
                 final var indexShardRoutingTable = indexRoutingTable.shard(i);
                 assertTrue(indexShardRoutingTable.primaryShard().initializing());
@@ -1021,6 +1015,49 @@ public class DesiredBalanceReconcilerTests extends ESTestCase {
         assertThat(reroutedState.getRoutingNodes().node("node-1").shardsWithState(ShardRoutingState.RELOCATING), hasSize(1));
     }
 
+    public void testDoNotRebalanceToTheNodeThatNoLongerExists() {
+
+        var indexMetadata = IndexMetadata.builder("index-1")
+            .settings(
+                Settings.builder()
+                    .put(SETTING_NUMBER_OF_SHARDS, 1)
+                    .put(SETTING_NUMBER_OF_REPLICAS, 0)
+                    .put(SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
+            )
+            .system(randomBoolean())
+            .build();
+        final var index = indexMetadata.getIndex();
+        final var shardId = new ShardId(index, 0);
+
+        final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .nodes(
+                DiscoveryNodes.builder()
+                    // data-node-1 left the cluster
+                    .localNodeId("data-node-2")
+                    .masterNodeId("data-node-2")
+                    .add(new DiscoveryNode("data-node-2", buildNewFakeTransportAddress(), Version.CURRENT))
+            )
+            .metadata(Metadata.builder().put(indexMetadata, true))
+            .routingTable(
+                RoutingTable.builder()
+                    .add(IndexRoutingTable.builder(index).addShard(newShardRouting(shardId, "data-node-2", true, STARTED)))
+            )
+            .build();
+
+        final var allocation = createRoutingAllocationFrom(clusterState);
+        final var balance = new DesiredBalance(
+            1,
+            Map.of(shardId, new ShardAssignment(Set.of("data-node-1"), 1, 0, 0)) // shard is assigned to the node that has left
+        );
+
+        reconcile(allocation, balance);
+
+        assertThat(allocation.routingNodes().node("data-node-1"), nullValue());
+        assertThat(allocation.routingNodes().node("data-node-2"), notNullValue());
+        // shard is kept wherever until balance is recalculated
+        assertThat(allocation.routingNodes().node("data-node-2").getByShardId(shardId), notNullValue());
+    }
+
     private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) {
         new DesiredBalanceReconciler(desiredBalance, routingAllocation, new NodeAllocationOrdering()).run();
     }
@@ -1037,6 +1074,17 @@ public class DesiredBalanceReconcilerTests extends ESTestCase {
         );
     }
 
+    private static RoutingAllocation createRoutingAllocationFrom(ClusterState clusterState) {
+        return new RoutingAllocation(
+            new AllocationDeciders(List.of()),
+            clusterState.mutableRoutingNodes(),
+            clusterState,
+            ClusterInfo.EMPTY,
+            SnapshotShardSizeInfo.EMPTY,
+            0L
+        );
+    }
+
     private static AllocationService createTestAllocationService(
         Consumer<RoutingAllocation> allocationConsumer,
         ClusterInfoService clusterInfoService,
@@ -1116,19 +1164,16 @@ public class DesiredBalanceReconcilerTests extends ESTestCase {
     private static DiscoveryNodes discoveryNodes(int nodeCount) {
         final var discoveryNodes = DiscoveryNodes.builder();
         for (var i = 0; i < nodeCount; i++) {
-            final var transportAddress = buildNewFakeTransportAddress();
-            final var discoveryNode = new DiscoveryNode(
-                "node-" + i,
-                "node-" + i,
-                UUIDs.randomBase64UUID(random()),
-                transportAddress.address().getHostString(),
-                transportAddress.getAddress(),
-                transportAddress,
-                Map.of(),
-                Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE),
-                Version.CURRENT
+            discoveryNodes.add(
+                new DiscoveryNode(
+                    "node-" + i,
+                    "node-" + i,
+                    buildNewFakeTransportAddress(),
+                    Map.of(),
+                    Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE),
+                    Version.CURRENT
+                )
             );
-            discoveryNodes.add(discoveryNode);
         }
         discoveryNodes.masterNodeId("node-0").localNodeId("node-0");
         return discoveryNodes.build();

+ 1 - 6
server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java

@@ -33,7 +33,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
 import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
-import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
@@ -377,14 +376,10 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
     }
 
     private static DiscoveryNode createDiscoveryNode(String nodeId) {
-        var transportAddress = buildNewFakeTransportAddress();
         return new DiscoveryNode(
             nodeId,
             nodeId,
-            UUIDs.randomBase64UUID(random()),
-            transportAddress.address().getHostString(),
-            transportAddress.getAddress(),
-            transportAddress,
+            buildNewFakeTransportAddress(),
             Map.of(),
             Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE),
             Version.CURRENT