Explorar o código

Add more `addTemporaryStateListener` utils (#125648)

We often call `addTemporaryStateListener` with the `ClusterService` of a
random node, or the currently elected master. This commit adds utilities
for this common pattern.
David Turner hai 6 meses
pai
achega
b858ec0399

+ 0 - 1
server/src/internalClusterTest/java/org/elasticsearch/action/support/master/TransportMasterNodeActionIT.java

@@ -214,7 +214,6 @@ public class TransportMasterNodeActionIT extends ESIntegTestCase {
      */
     private static String ensureSufficientMasterEligibleNodes() {
         final var votingConfigSizeListener = ClusterServiceUtils.addTemporaryStateListener(
-            internalCluster().getAnyMasterNodeInstance(ClusterService.class),
             cs -> 5 <= cs.coordinationMetadata().getLastCommittedConfiguration().getNodeIds().size()
         );
 

+ 0 - 3
server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

@@ -26,7 +26,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
 import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
-import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ListenableFuture;
@@ -2182,7 +2181,6 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
             // ensure each snapshot has really started before moving on to the next one
             safeAwait(
                 ClusterServiceUtils.addTemporaryStateListener(
-                    internalCluster().getInstance(ClusterService.class),
                     cs -> SnapshotsInProgress.get(cs)
                         .forRepo(repoName)
                         .stream()
@@ -2202,7 +2200,6 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
         final var indexRecreatedListener = ClusterServiceUtils
             // wait until the snapshot has entered finalization
             .addTemporaryStateListener(
-                internalCluster().getInstance(ClusterService.class),
                 cs -> SnapshotsInProgress.get(cs)
                     .forRepo(repoName)
                     .stream()

+ 1 - 7
server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java

@@ -19,7 +19,6 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
-import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -848,12 +847,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
                 clusterAdmin().prepareCloneSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snap-1", "test-snap-2")
                     .setIndices("test-idx-1")
                     .get();
-                safeAwait(
-                    ClusterServiceUtils.addTemporaryStateListener(
-                        internalCluster().getInstance(ClusterService.class),
-                        cs -> SnapshotsInProgress.get(cs).isEmpty()
-                    )
-                );
+                safeAwait(ClusterServiceUtils.addTemporaryStateListener(cs -> SnapshotsInProgress.get(cs).isEmpty()));
                 assertThat(
                     clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, "test-repo")
                         .setSnapshots("test-snap-2")

+ 1 - 6
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

@@ -1475,12 +1475,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
                 assertNotNull(indexShard);
                 final var primaryTerm = indexShard.getOperationPrimaryTerm();
                 indexShard.failShard("simulated", new ElasticsearchException("simulated"));
-                safeAwait(
-                    ClusterServiceUtils.addTemporaryStateListener(
-                        internalCluster().getInstance(ClusterService.class),
-                        cs -> cs.metadata().index(indexName).primaryTerm(0) > primaryTerm
-                    )
-                );
+                safeAwait(ClusterServiceUtils.addTemporaryStateListener(cs -> cs.metadata().index(indexName).primaryTerm(0) > primaryTerm));
                 ensureGreen(indexName);
             }
         } finally {

+ 16 - 23
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java

@@ -16,7 +16,6 @@ import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
-import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.snapshots.mockstore.MockRepository;
@@ -170,22 +169,19 @@ public class SnapshotsServiceIT extends AbstractSnapshotIntegTestCase {
      */
     private SubscribableListener<Void> createSnapshotDeletionListener(String repositoryName) {
         AtomicBoolean deleteHasStarted = new AtomicBoolean(false);
-        return ClusterServiceUtils.addTemporaryStateListener(
-            internalCluster().getCurrentMasterNodeInstance(ClusterService.class),
-            state -> {
-                SnapshotDeletionsInProgress deletionsInProgress = (SnapshotDeletionsInProgress) state.getCustoms()
-                    .get(SnapshotDeletionsInProgress.TYPE);
-                if (deletionsInProgress == null) {
-                    return false;
-                }
-                if (deleteHasStarted.get() == false) {
-                    deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName));
-                    return false;
-                } else {
-                    return deletionsInProgress.hasExecutingDeletion(repositoryName) == false;
-                }
+        return ClusterServiceUtils.addMasterTemporaryStateListener(state -> {
+            SnapshotDeletionsInProgress deletionsInProgress = (SnapshotDeletionsInProgress) state.getCustoms()
+                .get(SnapshotDeletionsInProgress.TYPE);
+            if (deletionsInProgress == null) {
+                return false;
             }
-        );
+            if (deleteHasStarted.get() == false) {
+                deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName));
+                return false;
+            } else {
+                return deletionsInProgress.hasExecutingDeletion(repositoryName) == false;
+            }
+        });
     }
 
     public void testRerouteWhenShardSnapshotsCompleted() throws Exception {
@@ -209,13 +205,10 @@ public class SnapshotsServiceIT extends AbstractSnapshotIntegTestCase {
                 .put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", originalNode)
         );
 
-        final var shardMovedListener = ClusterServiceUtils.addTemporaryStateListener(
-            internalCluster().getCurrentMasterNodeInstance(ClusterService.class),
-            state -> {
-                final var primaryShard = state.routingTable().index(indexName).shard(0).primaryShard();
-                return primaryShard.started() && originalNode.equals(state.nodes().get(primaryShard.currentNodeId()).getName()) == false;
-            }
-        );
+        final var shardMovedListener = ClusterServiceUtils.addMasterTemporaryStateListener(state -> {
+            final var primaryShard = state.routingTable().index(indexName).shard(0).primaryShard();
+            return primaryShard.started() && originalNode.equals(state.nodes().get(primaryShard.currentNodeId()).getName()) == false;
+        });
         assertFalse(shardMovedListener.isDone());
 
         unblockAllDataNodes(repoName);

+ 40 - 0
test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

@@ -263,6 +263,15 @@ public class ClusterServiceUtils {
         );
     }
 
+    /**
+     * Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state
+     * that satisfies {@code predicate}, at which point it unsubscribes itself.
+     *
+     * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
+     *         given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is
+     *         already complete. If no matching cluster state is seen within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is
+     *         completed exceptionally on the scheduler thread that belongs to {@code clusterService}.
+     */
     public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
         final var listener = new SubscribableListener<Void>();
         final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@@ -291,4 +300,35 @@ public class ClusterServiceUtils {
         }
         return listener;
     }
+
+    /**
+     * Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of one of the nodes in the
+     * {@link ESIntegTestCase#internalCluster()}. When the chosen {@link ClusterService} applies a state that satisfies {@code predicate}
+     * the listener unsubscribes itself.
+     *
+     * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
+     *         {@link ClusterService} belonging to one of the nodes in the {@link ESIntegTestCase#internalCluster()}. If the current cluster
+     *         state already matches {@code predicate} then the returned listener is already complete. If no matching cluster state is seen
+     *         within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that
+     *         belongs to the chosen node's {@link ClusterService}.
+     */
+    public static SubscribableListener<Void> addTemporaryStateListener(Predicate<ClusterState> predicate) {
+        return addTemporaryStateListener(ESIntegTestCase.internalCluster().clusterService(), predicate);
+    }
+
+    /**
+     * Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of the current elected master node in the
+     * {@link ESIntegTestCase#internalCluster()}. When this node's {@link ClusterService} applies a state that satisfies {@code predicate}
+     * the listener unsubscribes itself.
+     *
+     * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
+     *         {@link ClusterService} belonging to the node that was the elected master node in the
+     *         {@link ESIntegTestCase#internalCluster()} when this method was first called. If the current cluster state already matches
+     *         {@code predicate} then the returned listener is already complete. If no matching cluster state is seen within
+     *         {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that belongs to
+     *         the elected master node's {@link ClusterService}.
+     */
+    public static SubscribableListener<Void> addMasterTemporaryStateListener(Predicate<ClusterState> predicate) {
+        return addTemporaryStateListener(ESIntegTestCase.internalCluster().getCurrentMasterNodeInstance(ClusterService.class), predicate);
+    }
 }