1
0
Эх сурвалжийг харах

Ensure reroute listeners are completed in tests (#94726)

Today the desired balance allocation service used in most tests will
compute the balance and reconcile in a single update, but it does not
complete its listeners. Mostly this does not matter because we use
`ActionListener.noop()` as the reroute listener almost everywhere.
However the `CoordinatorTests` will fire a reroute after state recovery
and rely on its listeners being completed to reset the flags in case a
subsequent state recovery is needed.

This commit ensures that the listener is completed by the allocator
created for use in tests.

Closes #91449
David Turner 2 жил өмнө
parent
commit
614d7eefec

+ 5 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

@@ -315,6 +315,11 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
         }
     }
 
+    // only for tests - in production, this happens after reconciliation
+    protected final void completeToLastConvergedIndex() {
+        queue.complete(currentDesiredBalance.lastConvergedIndex());
+    }
+
     private void recordTime(CounterMetric metric, Runnable action) {
         final long started = threadPool.relativeTimeInMillis();
         try {

+ 1 - 3
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

@@ -44,7 +44,6 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.ToXContent;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -721,7 +720,6 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
      * Old leader is initiating an election at the same time, and wins election. It becomes leader again, but as it previously
      * successfully completed state recovery, is never reset to a state where state recovery can be retried.
      */
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/91449")
     public void testStateRecoveryResetAfterPreviousLeadership() {
         try (Cluster cluster = new Cluster(3)) {
             cluster.runRandomly();
@@ -732,7 +730,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
             final ClusterNode follower2 = cluster.getAnyNodeExcept(leader, follower1);
 
             // restart follower1 and follower2
-            for (ClusterNode clusterNode : Arrays.asList(follower1, follower2)) {
+            for (ClusterNode clusterNode : List.of(follower1, follower2)) {
                 clusterNode.close();
                 cluster.clusterNodes.forEach(cn -> cluster.deterministicTaskQueue.scheduleNow(cn.onNode(new Runnable() {
                     @Override

+ 11 - 2
server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java

@@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterInfo;
 import org.elasticsearch.cluster.ClusterInfoService;
 import org.elasticsearch.cluster.ClusterState;
@@ -28,6 +29,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.routing.allocation.DataTier;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
 import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.MasterService;
@@ -289,7 +291,7 @@ public class ClusterAllocationSimulationTests extends ESAllocationTestCase {
             clusterInfoService.addIndex(shardSizeByIndex.getKey(), shardSizeByIndex.getValue());
         }
 
-        final var tuple = createNewAllocationService(threadPool, clusterService, clusterInfoService);
+        final var tuple = createNewAllocationService(threadPool, deterministicTaskQueue::runAllTasks, clusterService, clusterInfoService);
         final var allocationService = tuple.getKey();
 
         final var initializingPrimaries = allocationService.executeWithRoutingAllocation(
@@ -472,6 +474,7 @@ public class ClusterAllocationSimulationTests extends ESAllocationTestCase {
 
     private Map.Entry<MockAllocationService, ShardsAllocator> createNewAllocationService(
         ThreadPool threadPool,
+        Runnable runAllTasks,
         ClusterService clusterService,
         ClusterInfoService clusterInfoService
     ) {
@@ -486,7 +489,13 @@ public class ClusterAllocationSimulationTests extends ESAllocationTestCase {
             clusterService,
             (clusterState, routingAllocationAction) -> strategyRef.get()
                 .executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", routingAllocationAction)
-        );
+        ) {
+            @Override
+            public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
+                super.allocate(allocation, listener);
+                runAllTasks.run();
+            }
+        };
         var strategy = new MockAllocationService(
             randomAllocationDeciders(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
             new TestGatewayAllocator(),

+ 11 - 5
test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java

@@ -10,6 +10,7 @@ package org.elasticsearch.cluster;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -166,6 +167,8 @@ public abstract class ESAllocationTestCase extends ESTestCase {
                 lastAllocation = allocation;
                 super.allocate(allocation, listener);
                 queue.runAllTasks();
+                completeToLastConvergedIndex();
+                queue.runAllTasks();
             }
 
             @Override
@@ -309,11 +312,14 @@ public abstract class ESAllocationTestCase extends ESTestCase {
         ClusterState clusterState,
         List<ShardRouting> initializingShards
     ) {
-        return allocationService.reroute(
-            allocationService.applyStartedShards(clusterState, initializingShards),
-            "reroute after starting",
-            ActionListener.noop()
-        );
+        return reroute(allocationService, allocationService.applyStartedShards(clusterState, initializingShards));
+    }
+
+    public static ClusterState reroute(AllocationService allocationService, ClusterState clusterState) {
+        final var listener = new PlainActionFuture<Void>();
+        final var result = allocationService.reroute(clusterState, "test reroute", listener);
+        listener.result(); // ensures it completed successfully
+        return result;
     }
 
     public static class TestAllocateDecision extends AllocationDecider {