Browse Source

Respect gateway allocations in DB computer (#91690)

If one of the gateway allocators initializes a shard somewhere then the
spawned desired balance computation should start from a state with that
shard initializing. Today it doesn't, but this commit fixes that.

Closes #91451
Closes #91613
David Turner 2 years ago
parent
commit
cf0b1af418

+ 0 - 1
server/src/internalClusterTest/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java

@@ -244,7 +244,6 @@ public class ReplicaShardAllocatorIT extends ESIntegTestCase {
         transportServiceOnPrimary.clearAllRules();
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/91613")
     public void testFullClusterRestartPerformNoopRecovery() throws Exception {
         int numOfReplicas = randomIntBetween(1, 2);
         internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 2);

+ 0 - 1
server/src/internalClusterTest/java/org/elasticsearch/gateway/ReplicaShardAllocatorSyncIdIT.java

@@ -225,7 +225,6 @@ public class ReplicaShardAllocatorSyncIdIT extends ESIntegTestCase {
         transportServiceOnPrimary.clearAllRules();
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/91451")
     public void testFullClusterRestartPerformNoopRecovery() throws Exception {
         int numOfReplicas = randomIntBetween(1, 2);
         internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 2);

+ 11 - 1
server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java

@@ -407,7 +407,17 @@ public class RoutingAllocation {
     }
 
     public RoutingAllocation immutableClone() {
-        return new RoutingAllocation(deciders, clusterState, clusterInfo, shardSizeInfo, currentNanoTime);
+        return new RoutingAllocation(
+            deciders,
+            routingNodesChanged()
+                ? ClusterState.builder(clusterState)
+                    .routingTable(RoutingTable.of(clusterState.routingTable().version(), routingNodes))
+                    .build()
+                : clusterState,
+            clusterInfo,
+            shardSizeInfo,
+            currentNanoTime
+        );
     }
 
     public RoutingAllocation mutableCloneForSimulation() {

+ 5 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

@@ -251,6 +251,11 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
             assert MasterService.isPublishFailureException(e) : e;
             onNoLongerMaster();
         }
+
+        @Override
+        public String toString() {
+            return "ReconcileDesiredBalanceTask[lastConvergedIndex=" + desiredBalance.lastConvergedIndex() + "]";
+        }
     }
 
     private final class ReconcileDesiredBalanceExecutor implements ClusterStateTaskExecutor<ReconcileDesiredBalanceTask> {

+ 6 - 0
server/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java

@@ -151,6 +151,12 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
         // check promotion of replica to primary
         assertThat(clusterState.getRoutingNodes().node(origReplicaNodeId).iterator().next().state(), equalTo(STARTED));
         assertThat(clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), equalTo(origReplicaNodeId));
+
+        if (clusterState.routingTable().index("test").shard(0).replicaShards().get(0).assignedToNode() == false) {
+            // desired node may be ignored due to previous failure - try again if so
+            clusterState = startShardsAndReroute(allocation, clusterState);
+        }
+
         assertThat(
             clusterState.routingTable().index("test").shard(0).replicaShards().get(0).currentNodeId(),
             anyOf(equalTo(origPrimaryNodeId), equalTo("node3"))

+ 46 - 0
server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java

@@ -327,6 +327,52 @@ public class DesiredBalanceComputerTests extends ESTestCase {
         );
     }
 
+    public void testRespectsAssignmentByGatewayAllocators() {
+        var desiredBalanceComputer = createDesiredBalanceComputer();
+        var clusterState = createInitialClusterState(3);
+        var index = clusterState.metadata().index(TEST_INDEX).getIndex();
+
+        final var routingAllocation = new RoutingAllocation(
+            new AllocationDeciders(List.of()),
+            clusterState.mutableRoutingNodes(),
+            clusterState,
+            ClusterInfo.EMPTY,
+            SnapshotShardSizeInfo.EMPTY,
+            0L
+        );
+        for (var iterator = routingAllocation.routingNodes().unassigned().iterator(); iterator.hasNext();) {
+            var shardRouting = iterator.next();
+            if (shardRouting.shardId().id() == 0 && shardRouting.primary()) {
+                routingAllocation.routingNodes()
+                    .startShard(
+                        logger,
+                        iterator.initialize("node-2", null, 0L, routingAllocation.changes()),
+                        routingAllocation.changes(),
+                        0L
+                    );
+                break;
+            }
+        }
+
+        var desiredBalance = desiredBalanceComputer.compute(
+            DesiredBalance.INITIAL,
+            DesiredBalanceInput.create(randomNonNegativeLong(), routingAllocation),
+            queue(),
+            input -> true
+        );
+
+        assertDesiredAssignments(
+            desiredBalance,
+            Map.of(
+                new ShardId(index, 0),
+                new ShardAssignment(Set.of("node-2", "node-1"), 2, 0, 0),
+                new ShardId(index, 1),
+                new ShardAssignment(Set.of("node-0", "node-1"), 2, 0, 0)
+            )
+        );
+
+    }
+
     public void testSimulatesAchievingDesiredBalanceBeforeDelegating() {
 
         var allocateCalled = new AtomicBoolean();

+ 31 - 17
server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java

@@ -59,18 +59,18 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_VER
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
 
 public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
 
+    private static final String LOCAL_NODE_ID = "node-1";
+    private static final String OTHER_NODE_ID = "node-2";
+
     public void testGatewayAllocatorPreemptsAllocation() {
+        final var nodeId = randomFrom(LOCAL_NODE_ID, OTHER_NODE_ID);
         testAllocate(
-            (allocation, unassignedAllocationHandler) -> unassignedAllocationHandler.initialize(
-                allocation.nodes().getLocalNodeId(),
-                null,
-                0L,
-                allocation.changes()
-            ),
-            routingTable -> assertTrue(routingTable.index("test-index").shard(0).primaryShard().assignedToNode())
+            (allocation, unassignedAllocationHandler) -> unassignedAllocationHandler.initialize(nodeId, null, 0L, allocation.changes()),
+            routingTable -> assertEquals(nodeId, routingTable.index("test-index").shard(0).primaryShard().currentNodeId())
         );
     }
 
@@ -106,9 +106,10 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
         var deterministicTaskQueue = new DeterministicTaskQueue();
         var threadPool = deterministicTaskQueue.getThreadPool();
 
-        var localNode = createDiscoveryNode("node-1");
+        var localNode = createDiscoveryNode(LOCAL_NODE_ID);
+        var otherNode = createDiscoveryNode(OTHER_NODE_ID);
         var initialState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
-            .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
+            .nodes(DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
             .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
             .build();
 
@@ -116,8 +117,8 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
         var clusterService = new ClusterService(
             Settings.EMPTY,
             clusterSettings,
-            new FakeThreadPoolMasterService("node-1", "test", threadPool, deterministicTaskQueue::scheduleNow),
-            new ClusterApplierService("node-1", Settings.EMPTY, clusterSettings, threadPool) {
+            new FakeThreadPoolMasterService(LOCAL_NODE_ID, "test", threadPool, deterministicTaskQueue::scheduleNow),
+            new ClusterApplierService(LOCAL_NODE_ID, Settings.EMPTY, clusterSettings, threadPool) {
                 @Override
                 protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
                     return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor();
@@ -139,10 +140,13 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
             }
         };
 
-        var allocationService = createAllocationService(
-            new DesiredBalanceShardsAllocator(createShardsAllocator(), threadPool, clusterService, reconcileAction),
-            createGatewayAllocator(allocateUnassigned)
+        final var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
+            createShardsAllocator(),
+            threadPool,
+            clusterService,
+            reconcileAction
         );
+        var allocationService = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator(allocateUnassigned));
         allocationServiceRef.set(allocationService);
 
         var listenerCalled = new AtomicBoolean(false);
@@ -173,7 +177,17 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
 
         try {
             assertTrue(listenerCalled.get());
-            verifier.accept(clusterService.state().routingTable());
+            final var routingTable = clusterService.state().routingTable();
+            verifier.accept(routingTable);
+            final var desiredBalance = desiredBalanceShardsAllocator.getDesiredBalance();
+            for (final var indexRoutingTable : routingTable) {
+                for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
+                    final var shardRoutingTable = indexRoutingTable.shard(shardId);
+                    for (final var assignedShard : shardRoutingTable.assignedShards()) {
+                        assertThat(desiredBalance.getAssignment(assignedShard.shardId()).nodeIds(), hasItem(assignedShard.currentNodeId()));
+                    }
+                }
+            }
         } finally {
             clusterService.close();
         }
@@ -283,8 +297,8 @@ public class DesiredBalanceShardsAllocatorTests extends ESTestCase {
         var newMasterElected = new CountDownLatch(1);
         var clusterStateUpdatesExecuted = new CountDownLatch(1);
 
-        var node1 = createDiscoveryNode("node-1");
-        var node2 = createDiscoveryNode("node-2");
+        var node1 = createDiscoveryNode(LOCAL_NODE_ID);
+        var node2 = createDiscoveryNode(OTHER_NODE_ID);
         var initial = ClusterState.builder(ClusterName.DEFAULT)
             .nodes(DiscoveryNodes.builder().add(node1).add(node2).localNodeId(node1.getId()).masterNodeId(node1.getId()))
             .build();