Bläddra i källkod

Deprecate timed `PlainActionFuture#get` (#110859)

The timed `PlainActionFuture#get` method is almost exclusively used in
tests, and never in a place where it really matters that it supplies a
`PlainActionFuture<T>` rather than an `ActionListener<T>`. Moreover some
of its usages wait for longer than `SAFE_AWAIT_TIMEOUT`, typically 30s,
but not for any good reason. We have several suitable test utility
methods for achieving the same thing, and these utilities automatically
deal with exceptions/interrupts/timeouts appropriately. This commit
replaces all the test usages with more suitable test utilities.

The only production usages are in the CCR module. While it would be good
to remove this blocking code, for now this commit just introduces a new
CCR-only utility for this behaviour. That way we can deprecate the
widely-available `PlainActionFuture#get` method and remove it in a
follow-up.
David Turner 1 år sedan
förälder
incheckning
b15b76b869
39 ändrade filer med 326 tillägg och 354 borttagningar
  1. 7 7
      modules/reindex/src/test/java/org/elasticsearch/reindex/ClientScrollableHitSourceTests.java
  2. 3 11
      modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java
  3. 2 7
      server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/InitialClusterStateIT.java
  4. 9 2
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java
  5. 3 3
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java
  6. 8 15
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java
  7. 1 0
      server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java
  8. 2 6
      server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java
  9. 16 16
      server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java
  10. 4 8
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java
  11. 2 7
      server/src/test/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveClusterActionTests.java
  12. 8 10
      server/src/test/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryActionTests.java
  13. 3 8
      server/src/test/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesActionTests.java
  14. 5 17
      server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java
  15. 19 22
      server/src/test/java/org/elasticsearch/action/support/broadcast/unpromotable/TransportBroadcastUnpromotableActionTests.java
  16. 9 13
      server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
  17. 9 9
      server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java
  18. 8 13
      server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java
  19. 4 6
      server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java
  20. 1 2
      server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java
  21. 11 5
      server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java
  22. 14 16
      server/src/test/java/org/elasticsearch/index/CompositeIndexEventListenerTests.java
  23. 2 6
      server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  24. 11 8
      server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java
  25. 1 1
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  26. 15 15
      server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
  27. 4 5
      server/src/test/java/org/elasticsearch/snapshots/InternalSnapshotsInfoServiceTests.java
  28. 4 6
      server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java
  29. 4 6
      server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java
  30. 4 6
      server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java
  31. 3 10
      test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java
  32. 14 14
      test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java
  33. 16 14
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  34. 1 5
      test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
  35. 5 7
      test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java
  36. 38 5
      test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java
  37. 12 18
      test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java
  38. 31 17
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
  39. 13 8
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java

+ 7 - 7
modules/reindex/src/test/java/org/elasticsearch/reindex/ClientScrollableHitSourceTests.java

@@ -19,7 +19,6 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchScrollRequest;
 import org.elasticsearch.action.search.TransportSearchAction;
 import org.elasticsearch.action.search.TransportSearchScrollAction;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.internal.ParentTaskAssigningClient;
 import org.elasticsearch.client.internal.support.AbstractClient;
 import org.elasticsearch.common.bytes.BytesArray;
@@ -42,12 +41,14 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 
 import static org.apache.lucene.tests.util.TestUtil.randomSimpleString;
 import static org.elasticsearch.core.TimeValue.timeValueSeconds;
+import static org.hamcrest.Matchers.instanceOf;
 
 public class ClientScrollableHitSourceTests extends ESTestCase {
 
@@ -73,12 +74,11 @@ public class ClientScrollableHitSourceTests extends ESTestCase {
         dotestBasicsWithRetry(retries, 0, retries, e -> fail());
     }
 
-    public void testRetryFail() {
-        int retries = randomInt(10);
-        expectThrows(
-            EsRejectedExecutionException.class,
-            () -> PlainActionFuture.get(f -> dotestBasicsWithRetry(retries, retries + 1, retries + 1, f::onFailure), 0, TimeUnit.SECONDS)
-        );
+    public void testRetryFail() throws InterruptedException {
+        final int retries = randomInt(10);
+        final var exceptionRef = new AtomicReference<Exception>();
+        dotestBasicsWithRetry(retries, retries + 1, retries + 1, exceptionRef::set);
+        assertThat(exceptionRef.get(), instanceOf(EsRejectedExecutionException.class));
     }
 
     private void dotestBasicsWithRetry(int retries, int minFailures, int maxFailures, Consumer<Exception> failureHandler)

+ 3 - 11
modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

@@ -15,7 +15,6 @@ import com.amazonaws.services.s3.model.MultipartUpload;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
 
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.blobstore.OptionalBytesReference;
@@ -167,19 +166,12 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
 
                 class TestHarness {
                     boolean tryCompareAndSet(BytesReference expected, BytesReference updated) {
-                        return PlainActionFuture.<Boolean, RuntimeException>get(
-                            future -> blobContainer.compareAndSetRegister(randomPurpose(), "key", expected, updated, future),
-                            10,
-                            TimeUnit.SECONDS
-                        );
+                        return safeAwait(l -> blobContainer.compareAndSetRegister(randomPurpose(), "key", expected, updated, l));
                     }
 
                     BytesReference readRegister() {
-                        return PlainActionFuture.get(
-                            future -> blobContainer.getRegister(randomPurpose(), "key", future.map(OptionalBytesReference::bytesReference)),
-                            10,
-                            TimeUnit.SECONDS
-                        );
+                        final OptionalBytesReference result = safeAwait(l -> blobContainer.getRegister(randomPurpose(), "key", l));
+                        return result.bytesReference();
                     }
 
                     List<MultipartUpload> listMultipartUploads() {

+ 2 - 7
server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/InitialClusterStateIT.java

@@ -11,7 +11,6 @@ package org.elasticsearch.cluster.coordination;
 import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest;
 import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
 import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
@@ -19,8 +18,6 @@ import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.InternalTestCluster;
 
-import java.util.concurrent.TimeUnit;
-
 import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING;
 
 @ESIntegTestCase.ClusterScope(numDataNodes = 0, autoManageMasterNodes = false)
@@ -40,10 +37,8 @@ public class InitialClusterStateIT extends ESIntegTestCase {
             assertEquals(expectCommitted, metadata.clusterUUIDCommitted());
             assertEquals(expectedValue, metadata.clusterUUID());
 
-            final ClusterStatsResponse response = PlainActionFuture.get(
-                fut -> client(nodeName).execute(TransportClusterStatsAction.TYPE, new ClusterStatsRequest(), fut),
-                10,
-                TimeUnit.SECONDS
+            final ClusterStatsResponse response = safeAwait(
+                listener -> client(nodeName).execute(TransportClusterStatsAction.TYPE, new ClusterStatsRequest(), listener)
             );
             assertEquals(expectedValue, response.getClusterUUID());
         }

+ 9 - 2
server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java

@@ -21,6 +21,7 @@ import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.index.IndexVersion;
@@ -381,7 +382,10 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
         Files.write(repo.resolve("index-" + repositoryData.getGenId()), randomByteArrayOfLength(randomIntBetween(1, 100)));
 
         logger.info("--> verify loading repository data throws RepositoryException");
-        expectThrows(RepositoryException.class, () -> getRepositoryData(repository));
+        asInstanceOf(
+            RepositoryException.class,
+            safeAwaitFailure(RepositoryData.class, l -> repository.getRepositoryData(EsExecutors.DIRECT_EXECUTOR_SERVICE, l))
+        );
 
         final String otherRepoName = "other-repo";
         assertAcked(
@@ -393,7 +397,10 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
         final Repository otherRepo = getRepositoryOnMaster(otherRepoName);
 
         logger.info("--> verify loading repository data from newly mounted repository throws RepositoryException");
-        expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo));
+        asInstanceOf(
+            RepositoryException.class,
+            safeAwaitFailure(RepositoryData.class, l -> repository.getRepositoryData(EsExecutors.DIRECT_EXECUTOR_SERVICE, l))
+        );
     }
 
     public void testHandleSnapshotErrorWithBwCFormat() throws Exception {

+ 3 - 3
server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java

@@ -482,7 +482,7 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
         // We must wait for all the cleanup work to be enqueued (with the throttled runner at least) so we can be sure of exactly how it
         // will execute. The cleanup work is enqueued by the master service thread on completion of the cluster state update which increases
         // the root blob generation in the repo metadata, so it is sufficient to wait for another no-op task to run on the master service:
-        PlainActionFuture.get(fut -> clusterService.createTaskQueue("test", Priority.NORMAL, new SimpleBatchedExecutor<>() {
+        safeAwait(listener -> clusterService.createTaskQueue("test", Priority.NORMAL, new SimpleBatchedExecutor<>() {
             @Override
             public Tuple<ClusterState, Object> executeTask(ClusterStateTaskListener clusterStateTaskListener, ClusterState clusterState) {
                 return Tuple.tuple(clusterState, null);
@@ -490,9 +490,9 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
 
             @Override
             public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Object ignored) {
-                fut.onResponse(null);
+                listener.onResponse(null);
             }
-        }).submitTask("test", e -> fail(), null), 10, TimeUnit.SECONDS);
+        }).submitTask("test", e -> fail(), null));
 
         final IntSupplier queueLength = () -> threadPool.stats()
             .stats()

+ 8 - 15
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java

@@ -19,7 +19,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
 import org.elasticsearch.action.support.ActionTestUtils;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -88,18 +87,16 @@ public class SnapshotShutdownIT extends AbstractSnapshotIntegTestCase {
         });
         addUnassignedShardsWatcher(clusterService, indexName);
 
-        PlainActionFuture.<Void, RuntimeException>get(
-            fut -> putShutdownMetadata(
+        safeAwait(
+            (ActionListener<Void> listener) -> putShutdownMetadata(
                 clusterService,
                 SingleNodeShutdownMetadata.builder()
                     .setType(SingleNodeShutdownMetadata.Type.RESTART)
                     .setStartedAtMillis(clusterService.threadPool().absoluteTimeInMillis())
                     .setReason("test"),
                 originalNode,
-                fut
-            ),
-            10,
-            TimeUnit.SECONDS
+                listener
+            )
         );
         assertFalse(snapshotCompletesWithoutPausingListener.isDone());
         unblockAllDataNodes(repoName); // lets the shard snapshot continue so the snapshot can succeed
@@ -451,11 +448,7 @@ public class SnapshotShutdownIT extends AbstractSnapshotIntegTestCase {
     }
 
     private static void putShutdownForRemovalMetadata(String nodeName, ClusterService clusterService) {
-        PlainActionFuture.<Void, RuntimeException>get(
-            fut -> putShutdownForRemovalMetadata(clusterService, nodeName, fut),
-            10,
-            TimeUnit.SECONDS
-        );
+        safeAwait((ActionListener<Void> listener) -> putShutdownForRemovalMetadata(clusterService, nodeName, listener));
     }
 
     private static void flushMasterQueue(ClusterService clusterService, ActionListener<Void> listener) {
@@ -525,7 +518,7 @@ public class SnapshotShutdownIT extends AbstractSnapshotIntegTestCase {
     }
 
     private static void clearShutdownMetadata(ClusterService clusterService) {
-        PlainActionFuture.get(fut -> clusterService.submitUnbatchedStateUpdateTask("remove restart marker", new ClusterStateUpdateTask() {
+        safeAwait(listener -> clusterService.submitUnbatchedStateUpdateTask("remove restart marker", new ClusterStateUpdateTask() {
             @Override
             public ClusterState execute(ClusterState currentState) {
                 return currentState.copyAndUpdateMetadata(mdb -> mdb.putCustom(NodesShutdownMetadata.TYPE, NodesShutdownMetadata.EMPTY));
@@ -538,8 +531,8 @@ public class SnapshotShutdownIT extends AbstractSnapshotIntegTestCase {
 
             @Override
             public void clusterStateProcessed(ClusterState initialState, ClusterState newState) {
-                fut.onResponse(null);
+                listener.onResponse(null);
             }
-        }), 10, TimeUnit.SECONDS);
+        }));
     }
 }

+ 1 - 0
server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java

@@ -375,6 +375,7 @@ public class PlainActionFuture<T> implements ActionFuture<T>, ActionListener<T>
         return fut.actionGet();
     }
 
+    @Deprecated(forRemoval = true) // temporary compatibility shim
     public static <T, E extends Exception> T get(CheckedConsumer<PlainActionFuture<T>, E> e, long timeout, TimeUnit unit) throws E {
         PlainActionFuture<T> fut = new PlainActionFuture<>();
         e.accept(fut);

+ 2 - 6
server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.action.admin.cluster.allocation;
 
 import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.PlainActionFuture;
@@ -48,7 +49,6 @@ import org.elasticsearch.transport.TransportService;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
@@ -131,11 +131,7 @@ public class TransportDeleteDesiredBalanceActionTests extends ESAllocationTestCa
             SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES
         );
 
-        PlainActionFuture.<Void, RuntimeException>get(
-            f -> allocationService.reroute(clusterState, "inital-allocate", f),
-            10,
-            TimeUnit.SECONDS
-        );
+        safeAwait((ActionListener<Void> listener) -> allocationService.reroute(clusterState, "inital-allocate", listener));
 
         var balanceBeforeReset = allocator.getDesiredBalance();
         assertThat(balanceBeforeReset.lastConvergedIndex(), greaterThan(DesiredBalance.INITIAL.lastConvergedIndex()));

+ 16 - 16
server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java

@@ -9,7 +9,7 @@ package org.elasticsearch.action.admin.cluster.allocation;
 
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.cluster.ClusterInfo;
 import org.elasticsearch.cluster.ClusterInfoService;
 import org.elasticsearch.cluster.ClusterInfoTests;
@@ -53,7 +53,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.ClusterModule.BALANCED_ALLOCATOR;
@@ -69,8 +68,8 @@ public class TransportGetDesiredBalanceActionTests extends ESAllocationTestCase
 
     private final DesiredBalanceShardsAllocator desiredBalanceShardsAllocator = mock(DesiredBalanceShardsAllocator.class);
     private final ClusterInfoService clusterInfoService = mock(ClusterInfoService.class);
-    private ThreadPool threadPool = mock(ThreadPool.class);
-    private TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
+    private final ThreadPool threadPool = mock(ThreadPool.class);
+    private final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
     private TransportGetDesiredBalanceAction transportGetDesiredBalanceAction;
 
     @Before
@@ -87,24 +86,25 @@ public class TransportGetDesiredBalanceActionTests extends ESAllocationTestCase
         );
     }
 
-    private static DesiredBalanceResponse execute(TransportGetDesiredBalanceAction action, ClusterState clusterState) throws Exception {
-        return PlainActionFuture.get(
-            future -> action.masterOperation(
+    private static SubscribableListener<DesiredBalanceResponse> execute(
+        TransportGetDesiredBalanceAction action,
+        ClusterState clusterState
+    ) {
+        return SubscribableListener.newForked(
+            listener -> action.masterOperation(
                 new Task(1, "test", TransportGetDesiredBalanceAction.TYPE.name(), "", TaskId.EMPTY_TASK_ID, Map.of()),
                 new DesiredBalanceRequest(TEST_REQUEST_TIMEOUT),
                 clusterState,
-                future
-            ),
-            10,
-            TimeUnit.SECONDS
+                listener
+            )
         );
     }
 
-    private DesiredBalanceResponse executeAction(ClusterState clusterState) throws Exception {
+    private SubscribableListener<DesiredBalanceResponse> executeAction(ClusterState clusterState) {
         return execute(transportGetDesiredBalanceAction, clusterState);
     }
 
-    public void testReturnsErrorIfAllocatorIsNotDesiredBalanced() throws Exception {
+    public void testReturnsErrorIfAllocatorIsNotDesiredBalanced() {
         var clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadataWithConfiguredAllocator(BALANCED_ALLOCATOR)).build();
         final var action = new TransportGetDesiredBalanceAction(
             transportService,
@@ -117,7 +117,7 @@ public class TransportGetDesiredBalanceActionTests extends ESAllocationTestCase
             mock(WriteLoadForecaster.class)
         );
 
-        final var exception = expectThrows(ResourceNotFoundException.class, () -> execute(action, clusterState));
+        final var exception = asInstanceOf(ResourceNotFoundException.class, safeAwaitFailure(execute(action, clusterState)));
         assertEquals("Desired balance allocator is not in use, no desired balance found", exception.getMessage());
         assertThat(exception.status(), equalTo(RestStatus.NOT_FOUND));
     }
@@ -129,7 +129,7 @@ public class TransportGetDesiredBalanceActionTests extends ESAllocationTestCase
 
         assertEquals(
             "Desired balance is not computed yet",
-            expectThrows(ResourceNotFoundException.class, () -> executeAction(clusterState)).getMessage()
+            asInstanceOf(ResourceNotFoundException.class, safeAwaitFailure(executeAction(clusterState))).getMessage()
         );
     }
 
@@ -230,7 +230,7 @@ public class TransportGetDesiredBalanceActionTests extends ESAllocationTestCase
             .routingTable(routingTable)
             .build();
 
-        final var desiredBalanceResponse = executeAction(clusterState);
+        final var desiredBalanceResponse = safeAwait(executeAction(clusterState));
         assertThat(desiredBalanceResponse.getStats(), equalTo(desiredBalanceStats));
         assertThat(desiredBalanceResponse.getClusterBalanceStats(), notNullValue());
         assertThat(desiredBalanceResponse.getClusterInfo(), equalTo(clusterInfo));

+ 4 - 8
server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java

@@ -741,10 +741,8 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
         }
         reachabilityChecker.checkReachable();
 
-        PlainActionFuture.<Void, RuntimeException>get(
-            fut -> testNodes[0].transportService.getTaskManager().cancelTaskAndDescendants(task, "test", false, fut),
-            10,
-            TimeUnit.SECONDS
+        safeAwait(
+            (ActionListener<Void> l) -> testNodes[0].transportService.getTaskManager().cancelTaskAndDescendants(task, "test", false, l)
         );
 
         reachabilityChecker.ensureUnreachable();
@@ -817,10 +815,8 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
             reachabilityChecker.checkReachable();
         }
 
-        PlainActionFuture.<Void, RuntimeException>get(
-            fut -> testNodes[0].transportService.getTaskManager().cancelTaskAndDescendants(task, "test", false, fut),
-            10,
-            TimeUnit.SECONDS
+        safeAwait(
+            (ActionListener<Void> l) -> testNodes[0].transportService.getTaskManager().cancelTaskAndDescendants(task, "test", false, l)
         );
 
         reachabilityChecker.ensureUnreachable();

+ 2 - 7
server/src/test/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveClusterActionTests.java

@@ -12,7 +12,6 @@ import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.support.ActionFilter;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.VersionInformation;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -88,13 +87,9 @@ public class TransportResolveClusterActionTests extends ESTestCase {
                 null
             );
 
-            IllegalArgumentException ex = expectThrows(
+            final var ex = asInstanceOf(
                 IllegalArgumentException.class,
-                () -> PlainActionFuture.<ResolveClusterActionResponse, RuntimeException>get(
-                    future -> action.doExecute(null, request, future),
-                    10,
-                    TimeUnit.SECONDS
-                )
+                safeAwaitFailure(ResolveClusterActionResponse.class, listener -> action.doExecute(null, request, listener))
             );
 
             assertThat(ex.getMessage(), containsString("not compatible with version"));

+ 8 - 10
server/src/test/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryActionTests.java

@@ -9,11 +9,10 @@
 package org.elasticsearch.action.admin.indices.validate.query;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 
-import java.util.concurrent.TimeUnit;
+import static org.hamcrest.Matchers.instanceOf;
 
 public class TransportValidateQueryActionTests extends ESSingleNodeTestCase {
 
@@ -24,15 +23,14 @@ public class TransportValidateQueryActionTests extends ESSingleNodeTestCase {
      * them garbled together, or trying to write one after the channel had closed, etc.
      */
     public void testListenerOnlyInvokedOnceWhenIndexDoesNotExist() {
-        expectThrows(
-            IndexNotFoundException.class,
-            () -> PlainActionFuture.<ValidateQueryResponse, RuntimeException>get(
-                future -> client().admin()
+        assertThat(
+            safeAwaitFailure(
+                ValidateQueryResponse.class,
+                listener -> client().admin()
                     .indices()
-                    .validateQuery(new ValidateQueryRequest("non-existent-index"), ActionListener.assertOnce(future)),
-                10,
-                TimeUnit.SECONDS
-            )
+                    .validateQuery(new ValidateQueryRequest("non-existent-index"), ActionListener.assertOnce(listener))
+            ),
+            instanceOf(IndexNotFoundException.class)
         );
     }
 

+ 3 - 8
server/src/test/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesActionTests.java

@@ -12,7 +12,6 @@ import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.support.ActionFilter;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.VersionInformation;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -45,7 +44,7 @@ public class TransportFieldCapabilitiesActionTests extends ESTestCase {
         ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
     }
 
-    public void testCCSCompatibilityCheck() throws Exception {
+    public void testCCSCompatibilityCheck() {
         Settings settings = Settings.builder()
             .put("node.name", TransportFieldCapabilitiesActionTests.class.getSimpleName())
             .put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true")
@@ -87,13 +86,9 @@ public class TransportFieldCapabilitiesActionTests extends ESTestCase {
                 null
             );
 
-            IllegalArgumentException ex = expectThrows(
+            IllegalArgumentException ex = asInstanceOf(
                 IllegalArgumentException.class,
-                () -> PlainActionFuture.<FieldCapabilitiesResponse, RuntimeException>get(
-                    future -> action.doExecute(null, fieldCapsRequest, future),
-                    10,
-                    TimeUnit.SECONDS
-                )
+                safeAwaitFailure(FieldCapabilitiesResponse.class, l -> action.doExecute(null, fieldCapsRequest, l))
             );
 
             assertThat(

+ 5 - 17
server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java

@@ -246,11 +246,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
     private static final String TEST_THREAD_POOL_NAME = "test_thread_pool";
 
     private static void awaitForkedTasks() {
-        PlainActionFuture.get(
-            listener -> THREAD_POOL.executor(TEST_THREAD_POOL_NAME).execute(ActionRunnable.run(listener, () -> {})),
-            10,
-            TimeUnit.SECONDS
-        );
+        safeAwait(listener -> THREAD_POOL.executor(TEST_THREAD_POOL_NAME).execute(ActionRunnable.run(listener, () -> {})));
     }
 
     @BeforeClass
@@ -347,13 +343,9 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
 
         assertEquals(
             "blocked by: [SERVICE_UNAVAILABLE/1/test-block];",
-            expectThrows(
+            asInstanceOf(
                 ClusterBlockException.class,
-                () -> PlainActionFuture.<Response, RuntimeException>get(
-                    listener -> action.doExecute(null, request, listener),
-                    10,
-                    TimeUnit.SECONDS
-                )
+                safeAwaitFailure(Response.class, listener -> action.doExecute(null, request, listener))
             ).getMessage()
         );
     }
@@ -369,13 +361,9 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
         setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
         assertEquals(
             "index [" + TEST_INDEX + "] blocked by: [SERVICE_UNAVAILABLE/1/test-block];",
-            expectThrows(
+            asInstanceOf(
                 ClusterBlockException.class,
-                () -> PlainActionFuture.<Response, RuntimeException>get(
-                    listener -> action.doExecute(null, request, listener),
-                    10,
-                    TimeUnit.SECONDS
-                )
+                safeAwaitFailure(Response.class, listener -> action.doExecute(null, request, listener))
             ).getMessage()
         );
     }

+ 19 - 22
server/src/test/java/org/elasticsearch/action/support/broadcast/unpromotable/TransportBroadcastUnpromotableActionTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@@ -323,13 +324,14 @@ public class TransportBroadcastUnpromotableActionTests extends ESTestCase {
 
         // The request fails if we don't mark shards as stale
         assertThat(
-            expectThrows(NodeNotConnectedException.class, () -> brodcastUnpromotableRequest(wrongRoutingTable, false)).toString(),
+            asInstanceOf(NodeNotConnectedException.class, safeAwaitFailure(broadcastUnpromotableRequest(wrongRoutingTable, false)))
+                .toString(),
             containsString("discovery node must not be null")
         );
         Mockito.verifyNoInteractions(shardStateAction);
 
         // We were able to mark shards as stale, so the request finishes successfully
-        assertThat(brodcastUnpromotableRequest(wrongRoutingTable, true), equalTo(ActionResponse.Empty.INSTANCE));
+        assertThat(safeAwait(broadcastUnpromotableRequest(wrongRoutingTable, true)), equalTo(ActionResponse.Empty.INSTANCE));
         for (var shardRouting : wrongRoutingTable.unpromotableShards()) {
             Mockito.verify(shardStateAction)
                 .remoteShardFailed(
@@ -354,40 +356,35 @@ public class TransportBroadcastUnpromotableActionTests extends ESTestCase {
             .when(shardStateAction)
             .remoteShardFailed(any(ShardId.class), anyString(), anyLong(), anyBoolean(), anyString(), any(Exception.class), any());
         assertThat(
-            expectThrows(NodeNotConnectedException.class, () -> brodcastUnpromotableRequest(wrongRoutingTable, true)).toString(),
+            asInstanceOf(NodeNotConnectedException.class, safeAwaitFailure(broadcastUnpromotableRequest(wrongRoutingTable, true)))
+                .toString(),
             containsString("discovery node must not be null")
         );
     }
 
-    private ActionResponse brodcastUnpromotableRequest(IndexShardRoutingTable wrongRoutingTable, boolean failShardOnError)
-        throws Exception {
-        return PlainActionFuture.<ActionResponse.Empty, Exception>get(
-            f -> ActionTestUtils.execute(
+    private SubscribableListener<ActionResponse.Empty> broadcastUnpromotableRequest(
+        IndexShardRoutingTable wrongRoutingTable,
+        boolean failShardOnError
+    ) {
+        return SubscribableListener.newForked(
+            listener -> ActionTestUtils.execute(
                 broadcastUnpromotableAction,
                 null,
                 new TestBroadcastUnpromotableRequest(wrongRoutingTable, failShardOnError),
-                f
-            ),
-            10,
-            TimeUnit.SECONDS
+                listener
+            )
         );
     }
 
     public void testNullIndexShardRoutingTable() {
-        IndexShardRoutingTable shardRoutingTable = null;
         assertThat(
-
             expectThrows(
                 NullPointerException.class,
-                () -> PlainActionFuture.<ActionResponse.Empty, Exception>get(
-                    f -> ActionTestUtils.execute(
-                        broadcastUnpromotableAction,
-                        null,
-                        new TestBroadcastUnpromotableRequest(shardRoutingTable),
-                        f
-                    ),
-                    10,
-                    TimeUnit.SECONDS
+                () -> ActionTestUtils.execute(
+                    broadcastUnpromotableAction,
+                    null,
+                    new TestBroadcastUnpromotableRequest((IndexShardRoutingTable) null),
+                    ActionListener.running(ESTestCase::fail)
                 )
             ).toString(),
             containsString("index shard routing table is null")

+ 9 - 13
server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
 import org.elasticsearch.action.support.WriteResponse;
 import org.elasticsearch.action.support.replication.ReplicationOperation.ReplicaResponse;
@@ -253,20 +254,15 @@ public class TransportWriteActionTests extends ESTestCase {
     }
 
     public void testDocumentFailureInShardOperationOnPrimary() {
-        assertEquals(
-            "simulated",
-            expectThrows(
-                RuntimeException.class,
-                () -> PlainActionFuture.get(
-                    (PlainActionFuture<TransportReplicationAction.PrimaryResult<TestRequest, TestResponse>> future) -> new TestAction(
-                        true,
-                        randomBoolean()
-                    ).dispatchedShardOperationOnPrimary(new TestRequest(), indexShard, future),
-                    0,
-                    TimeUnit.SECONDS
-                )
-            ).getMessage()
+        final var listener = SubscribableListener.<Exception>newForked(
+            l -> new TestAction(true, randomBoolean()).dispatchedShardOperationOnPrimary(
+                new TestRequest(),
+                indexShard,
+                ActionTestUtils.assertNoSuccessListener(l::onResponse)
+            )
         );
+        assertTrue(listener.isDone());
+        assertEquals("simulated", asInstanceOf(RuntimeException.class, safeAwait(listener)).getMessage());
     }
 
     public void testDocumentFailureInShardOperationOnReplica() throws Exception {

+ 9 - 9
server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

@@ -298,10 +298,10 @@ public class NodeConnectionsServiceTests extends ESTestCase {
 
             // a blocked reconnection attempt doesn't also block the node from being deregistered
             service.disconnectFromNodesExcept(nodes1);
-            assertThat(PlainActionFuture.get(disconnectFuture1 -> {
-                assertTrue(disconnectListenerRef.compareAndSet(null, disconnectFuture1));
+            assertThat(safeAwait(disconnectListener -> {
+                assertTrue(disconnectListenerRef.compareAndSet(null, disconnectListener));
                 connectionBarrier.await(10, TimeUnit.SECONDS);
-            }, 10, TimeUnit.SECONDS), equalTo(node0)); // node0 connects briefly, must wait here
+            }), equalTo(node0)); // node0 connects briefly, must wait here
             assertConnectedExactlyToNodes(nodes1);
 
             // a blocked connection attempt to a new node also doesn't prevent an immediate deregistration
@@ -312,10 +312,10 @@ public class NodeConnectionsServiceTests extends ESTestCase {
             service.disconnectFromNodesExcept(nodes1);
             assertConnectedExactlyToNodes(nodes1);
 
-            assertThat(PlainActionFuture.get(disconnectFuture2 -> {
-                assertTrue(disconnectListenerRef.compareAndSet(null, disconnectFuture2));
+            assertThat(safeAwait(disconnectListener -> {
+                assertTrue(disconnectListenerRef.compareAndSet(null, disconnectListener));
                 connectionBarrier.await(10, TimeUnit.SECONDS);
-            }, 10, TimeUnit.SECONDS), equalTo(node0)); // node0 connects briefly, must wait here
+            }), equalTo(node0)); // node0 connects briefly, must wait here
             assertConnectedExactlyToNodes(nodes1);
             assertTrue(future5.isDone());
         } finally {
@@ -726,18 +726,18 @@ public class NodeConnectionsServiceTests extends ESTestCase {
     }
 
     private static void connectToNodes(NodeConnectionsService service, DiscoveryNodes discoveryNodes) {
-        PlainActionFuture.get(future -> service.connectToNodes(discoveryNodes, () -> future.onResponse(null)), 10, TimeUnit.SECONDS);
+        safeAwait(connectListener -> service.connectToNodes(discoveryNodes, () -> connectListener.onResponse(null)));
     }
 
     private static void ensureConnections(NodeConnectionsService service) {
-        PlainActionFuture.get(future -> service.ensureConnections(() -> future.onResponse(null)), 10, TimeUnit.SECONDS);
+        safeAwait(ensureListener -> service.ensureConnections(() -> ensureListener.onResponse(null)));
     }
 
     private static void closeConnection(TransportService transportService, DiscoveryNode discoveryNode) {
         try {
             final var connection = transportService.getConnection(discoveryNode);
             connection.close();
-            PlainActionFuture.get(connection::addRemovedListener, 10, TimeUnit.SECONDS);
+            safeAwait(connection::addRemovedListener);
         } catch (NodeNotConnectedException e) {
             // ok
         }

+ 8 - 13
server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java

@@ -11,7 +11,6 @@ import org.apache.logging.log4j.Level;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionTestUtils;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.NotMasterException;
@@ -814,16 +813,14 @@ public class NodeJoinExecutorTests extends ESTestCase {
                 )
             );
             assertNull(
-                PlainActionFuture.<Void, RuntimeException>get(
-                    future -> clusterService.getMasterService()
+                safeAwait(
+                    (ActionListener<Void> listener) -> clusterService.getMasterService()
                         .createTaskQueue("test", Priority.NORMAL, executor)
                         .submitTask(
                             "test",
-                            JoinTask.singleNode(node1, CompatibilityVersionsUtils.staticCurrent(), Set.of(), TEST_REASON, future, 0L),
+                            JoinTask.singleNode(node1, CompatibilityVersionsUtils.staticCurrent(), Set.of(), TEST_REASON, listener, 0L),
                             null
-                        ),
-                    10,
-                    TimeUnit.SECONDS
+                        )
                 )
             );
             mockLog.assertAllExpectationsMatched();
@@ -843,8 +840,8 @@ public class NodeJoinExecutorTests extends ESTestCase {
                 )
             );
             assertNull(
-                PlainActionFuture.<Void, RuntimeException>get(
-                    future -> clusterService.getMasterService()
+                safeAwait(
+                    (ActionListener<Void> listener) -> clusterService.getMasterService()
                         .createTaskQueue("test", Priority.NORMAL, executor)
                         .submitTask(
                             "test",
@@ -853,13 +850,11 @@ public class NodeJoinExecutorTests extends ESTestCase {
                                 CompatibilityVersionsUtils.staticCurrent(),
                                 Set.of(),
                                 testReasonWithLink,
-                                future,
+                                listener,
                                 0L
                             ),
                             null
-                        ),
-                    10,
-                    TimeUnit.SECONDS
+                        )
                 )
             );
             mockLog.assertAllExpectationsMatched();

+ 4 - 6
server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java

@@ -310,12 +310,10 @@ public class BatchedRerouteServiceTests extends ESTestCase {
 
             // Case 3: a NotMasterException
 
-            PlainActionFuture.<Void, RuntimeException>get(future -> {
-                clusterService.getClusterApplierService().onNewClusterState("simulated", () -> {
-                    final var state = clusterService.state();
-                    return ClusterState.builder(state).nodes(state.nodes().withMasterNodeId(null)).build();
-                }, future);
-            }, 10, TimeUnit.SECONDS);
+            safeAwait((ActionListener<Void> listener) -> clusterService.getClusterApplierService().onNewClusterState("simulated", () -> {
+                final var state = clusterService.state();
+                return ClusterState.builder(state).nodes(state.nodes().withMasterNodeId(null)).build();
+            }, listener));
 
             mockLog.addExpectation(
                 new MockLog.SeenEventExpectation(

+ 1 - 2
server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java

@@ -11,7 +11,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionTestUtils;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterInfo;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
@@ -818,6 +817,6 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
     }
 
     private static void rerouteAndWait(AllocationService service, ClusterState clusterState, String reason) {
-        PlainActionFuture.<Void, RuntimeException>get(f -> service.reroute(clusterState, reason, f), 10, TimeUnit.SECONDS);
+        safeAwait((ActionListener<Void> listener) -> service.reroute(clusterState, reason, listener));
     }
 }

+ 11 - 5
server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java

@@ -12,6 +12,7 @@ import org.apache.lucene.tests.mockfile.FilterSeekableByteChannel;
 import org.apache.lucene.tests.util.LuceneTestCase;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.OptionalBytesReference;
 import org.elasticsearch.common.bytes.BytesArray;
@@ -43,7 +44,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -53,6 +53,7 @@ import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomN
 import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.oneOf;
 import static org.hamcrest.Matchers.startsWith;
@@ -177,7 +178,9 @@ public class FsBlobContainerTests extends ESTestCase {
     }
 
     private static <T> T getAsync(Consumer<ActionListener<T>> consumer) {
-        return PlainActionFuture.get(consumer::accept, 0, TimeUnit.SECONDS);
+        final var listener = SubscribableListener.newForked(consumer::accept);
+        assertTrue(listener.isDone());
+        return safeAwait(listener);
     }
 
     public void testCompareAndExchange() throws Exception {
@@ -235,9 +238,12 @@ public class FsBlobContainerTests extends ESTestCase {
         }
 
         container.writeBlob(randomPurpose(), key, new BytesArray(new byte[17]), false);
-        expectThrows(
-            IllegalStateException.class,
-            () -> getBytesAsync(l -> container.compareAndExchangeRegister(randomPurpose(), key, expectedValue.get(), BytesArray.EMPTY, l))
+        assertThat(
+            safeAwaitFailure(
+                OptionalBytesReference.class,
+                l -> container.compareAndExchangeRegister(randomPurpose(), key, expectedValue.get(), BytesArray.EMPTY, l)
+            ),
+            instanceOf(IllegalStateException.class)
         );
     }
 

+ 14 - 16
server/src/test/java/org/elasticsearch/index/CompositeIndexEventListenerTests.java

@@ -12,16 +12,14 @@ import org.apache.logging.log4j.Level;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
-import org.elasticsearch.action.support.PlainActionFuture;
-import org.elasticsearch.core.CheckedRunnable;
 import org.elasticsearch.index.shard.IndexEventListener;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
 import org.elasticsearch.test.MockLog;
 import org.hamcrest.Matchers;
 
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -72,16 +70,14 @@ public class CompositeIndexEventListenerTests extends IndexShardTestCase {
                 }).collect(Collectors.toList())
             );
 
-            final CheckedRunnable<Exception> beforeIndexShardRecoveryRunner = () -> assertNull(
-                PlainActionFuture.<Void, Exception>get(
-                    fut -> indexEventListener.beforeIndexShardRecovery(shard, shard.indexSettings(), fut),
-                    10,
-                    TimeUnit.SECONDS
-                )
+            final Consumer<ActionListener<Void>> beforeIndexShardRecoveryRunner = l -> indexEventListener.beforeIndexShardRecovery(
+                shard,
+                shard.indexSettings(),
+                l
             );
 
             failAtStep.set(stepCount);
-            beforeIndexShardRecoveryRunner.run();
+            assertNull(safeAwait(beforeIndexShardRecoveryRunner::accept));
             assertEquals(stepCount, stepNumber.getAndSet(0));
 
             if (stepCount > 0) {
@@ -95,7 +91,9 @@ public class CompositeIndexEventListenerTests extends IndexShardTestCase {
                 );
 
                 failAtStep.set(between(0, stepCount - 1));
-                final var rootCause = getRootCause(expectThrows(ElasticsearchException.class, beforeIndexShardRecoveryRunner::run));
+                final var rootCause = getRootCause(
+                    asInstanceOf(ElasticsearchException.class, safeAwaitFailure(beforeIndexShardRecoveryRunner))
+                );
                 assertEquals("simulated failure at step " + failAtStep.get(), rootCause.getMessage());
                 assertEquals(failAtStep.get() + 1, stepNumber.getAndSet(0));
                 mockLog.assertAllExpectationsMatched();
@@ -138,12 +136,10 @@ public class CompositeIndexEventListenerTests extends IndexShardTestCase {
                 }).collect(Collectors.toList())
             );
 
-            final CheckedRunnable<Exception> afterIndexShardRecoveryRunner = () -> assertNull(
-                PlainActionFuture.<Void, Exception>get(fut -> indexEventListener.afterIndexShardRecovery(shard, fut), 10, TimeUnit.SECONDS)
-            );
+            final Consumer<ActionListener<Void>> afterIndexShardRecoveryRunner = l -> indexEventListener.afterIndexShardRecovery(shard, l);
 
             failAtStep.set(stepCount);
-            afterIndexShardRecoveryRunner.run();
+            assertNull(safeAwait(afterIndexShardRecoveryRunner::accept));
             assertEquals(stepCount, stepNumber.getAndSet(0));
 
             if (stepCount > 0) {
@@ -157,7 +153,9 @@ public class CompositeIndexEventListenerTests extends IndexShardTestCase {
                 );
 
                 failAtStep.set(between(0, stepCount - 1));
-                final var rootCause = getRootCause(expectThrows(ElasticsearchException.class, afterIndexShardRecoveryRunner::run));
+                final var rootCause = getRootCause(
+                    asInstanceOf(ElasticsearchException.class, safeAwaitFailure(afterIndexShardRecoveryRunner))
+                );
                 assertEquals("simulated failure at step " + failAtStep.get(), rootCause.getMessage());
                 assertEquals(failAtStep.get() + 1, stepNumber.getAndSet(0));
                 mockLog.assertAllExpectationsMatched();

+ 2 - 6
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -3205,13 +3205,9 @@ public class InternalEngineTests extends EngineTestCase {
                     engine.syncTranslog(); // to advance persisted local checkpoint
                     assertEquals(engine.getProcessedLocalCheckpoint(), engine.getPersistedLocalCheckpoint());
                     globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
-                    expectThrows(
+                    asInstanceOf(
                         IllegalStateException.class,
-                        () -> PlainActionFuture.<Void, RuntimeException>get(
-                            future -> engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE, future),
-                            30,
-                            TimeUnit.SECONDS
-                        )
+                        safeAwaitFailure(Void.class, listener -> engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE, listener))
                     );
                     Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
                     assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));

+ 11 - 8
server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java

@@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -524,9 +525,15 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
         assertThat(permits.getActiveOperationsCount(), equalTo(0));
     }
 
+    private Releasable acquirePermitImmediately() {
+        final var listener = SubscribableListener.<Releasable>newForked(l -> permits.acquire(l, threadPool.generic(), false));
+        assertTrue(listener.isDone());
+        return safeAwait(listener);
+    }
+
     public void testAsyncBlockOperationsOnRejection() {
         final PlainActionFuture<Void> threadBlock = new PlainActionFuture<>();
-        try (Releasable firstPermit = PlainActionFuture.get(f -> permits.acquire(f, threadPool.generic(), false), 0, TimeUnit.SECONDS)) {
+        try (Releasable firstPermit = acquirePermitImmediately()) {
             assertNotNull(firstPermit);
 
             final var rejectingExecutor = threadPool.executor(REJECTING_EXECUTOR);
@@ -539,9 +546,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
             );
 
             // ensure that the exception means no block was put in place
-            try (
-                Releasable secondPermit = PlainActionFuture.get(f -> permits.acquire(f, threadPool.generic(), false), 0, TimeUnit.SECONDS)
-            ) {
+            try (Releasable secondPermit = acquirePermitImmediately()) {
                 assertNotNull(secondPermit);
             }
         } finally {
@@ -556,7 +561,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
 
     public void testAsyncBlockOperationsOnTimeout() {
         final PlainActionFuture<Void> threadBlock = new PlainActionFuture<>();
-        try (Releasable firstPermit = PlainActionFuture.get(f -> permits.acquire(f, threadPool.generic(), false), 0, TimeUnit.SECONDS)) {
+        try (Releasable firstPermit = acquirePermitImmediately()) {
             assertNotNull(firstPermit);
 
             assertEquals(
@@ -570,9 +575,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
             );
 
             // ensure that the exception means no block was put in place
-            try (
-                Releasable secondPermit = PlainActionFuture.get(f -> permits.acquire(f, threadPool.generic(), false), 0, TimeUnit.SECONDS)
-            ) {
+            try (Releasable secondPermit = acquirePermitImmediately()) {
                 assertNotNull(secondPermit);
             }
 

+ 1 - 1
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -2981,7 +2981,7 @@ public class IndexShardTests extends IndexShardTestCase {
         // Shard is still inactive since we haven't started recovering yet
         assertFalse(shard.isActive());
         shard.recoveryState().getIndex().setFileDetailsComplete();
-        PlainActionFuture.get(shard::openEngineAndRecoverFromTranslog, 30, TimeUnit.SECONDS);
+        safeAwait(shard::openEngineAndRecoverFromTranslog);
         // Shard should now be active since we did recover:
         assertTrue(shard.isActive());
         closeShards(shard);

+ 15 - 15
server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

@@ -103,7 +103,6 @@ import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -807,20 +806,21 @@ public class RecoverySourceHandlerTests extends MapperServiceTestCase {
 
         Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test"));
         cancelingThread.start();
-        try {
-            PlainActionFuture.<Void, RuntimeException>get(
-                future -> RecoverySourceHandler.runUnderPrimaryPermit(
-                    listener -> listener.onResponse(null),
-                    shard,
-                    cancellableThreads,
-                    future
-                ),
-                10,
-                TimeUnit.SECONDS
-            );
-        } catch (CancellableThreads.ExecutionCancelledException e) {
-            // expected.
-        }
+        safeAwait(
+            runListener -> RecoverySourceHandler.runUnderPrimaryPermit(
+                permitListener -> permitListener.onResponse(null),
+                shard,
+                cancellableThreads,
+                runListener.delegateResponse((l, e) -> {
+                    if (e instanceof CancellableThreads.ExecutionCancelledException) {
+                        // expected.
+                        l.onResponse(null);
+                    } else {
+                        l.onFailure(e);
+                    }
+                })
+            )
+        );
         cancelingThread.join();
         // we have to use assert busy as we may be interrupted while acquiring the permit, if so we want to check
         // that the permit is released.

+ 4 - 5
server/src/test/java/org/elasticsearch/snapshots/InternalSnapshotsInfoServiceTests.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.snapshots;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ESAllocationTestCase;
@@ -381,11 +382,9 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase {
     }
 
     private void applyClusterState(final String reason, final Function<ClusterState, ClusterState> applier) {
-        PlainActionFuture.<Void, RuntimeException>get(
-            future -> clusterService.getClusterApplierService()
-                .onNewClusterState(reason, () -> applier.apply(clusterService.state()), future),
-            10,
-            TimeUnit.SECONDS
+        safeAwait(
+            (ActionListener<Void> listener) -> clusterService.getClusterApplierService()
+                .onNewClusterState(reason, () -> applier.apply(clusterService.state()), listener)
         );
     }
 

+ 4 - 6
server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java

@@ -431,15 +431,13 @@ public class ClusterConnectionManagerTests extends ESTestCase {
         assertTrue(pendingConnectionPermits.tryAcquire(10, TimeUnit.SECONDS));
         // ... and then send a connection attempt through the system to ensure that the lagging has started
         Releasables.closeExpectNoException(
-            PlainActionFuture.<Releasable, RuntimeException>get(
-                fut -> connectionManager.connectToNode(
+            safeAwait(
+                (ActionListener<Releasable> listener) -> connectionManager.connectToNode(
                     DiscoveryNodeUtils.create("", new TransportAddress(InetAddress.getLoopbackAddress(), 0)),
                     connectionProfile,
                     validator,
-                    fut
-                ),
-                30,
-                TimeUnit.SECONDS
+                    listener
+                )
             )
         );
 

+ 4 - 6
server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java

@@ -194,17 +194,15 @@ public class RemoteClusterAwareClientTests extends ESTestCase {
                     randomBoolean(),
                     null
                 );
-                final SearchShardsResponse searchShardsResponse = PlainActionFuture.get(
-                    future -> client.execute(
+                final SearchShardsResponse searchShardsResponse = safeAwait(
+                    listener -> client.execute(
                         TransportSearchShardsAction.REMOTE_TYPE,
                         searchShardsRequest,
                         ActionListener.runBefore(
-                            future,
+                            listener,
                             () -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']'))
                         )
-                    ),
-                    10,
-                    TimeUnit.SECONDS
+                    )
                 );
                 assertThat(searchShardsResponse.getNodes(), equalTo(knownNodes));
             }

+ 4 - 6
server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java

@@ -99,17 +99,15 @@ public class RemoteClusterClientTests extends ESTestCase {
                     threadPool.executor(TEST_THREAD_POOL_NAME),
                     randomFrom(RemoteClusterService.DisconnectedStrategy.values())
                 );
-                ClusterStateResponse clusterStateResponse = PlainActionFuture.get(
-                    future -> client.execute(
+                ClusterStateResponse clusterStateResponse = safeAwait(
+                    listener -> client.execute(
                         ClusterStateAction.REMOTE_TYPE,
                         new ClusterStateRequest(),
                         ActionListener.runBefore(
-                            future,
+                            listener,
                             () -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']'))
                         )
-                    ),
-                    10,
-                    TimeUnit.SECONDS
+                    )
                 );
                 assertNotNull(clusterStateResponse);
                 assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());

+ 3 - 10
test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java

@@ -21,7 +21,6 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.transport.Transport;
 
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import static org.elasticsearch.test.ESTestCase.fail;
@@ -34,10 +33,8 @@ public class ActionTestUtils {
         TransportAction<Request, Response> action,
         Request request
     ) {
-        return PlainActionFuture.get(
-            future -> action.execute(request.createTask(1L, "direct", action.actionName, TaskId.EMPTY_TASK_ID, Map.of()), request, future),
-            10,
-            TimeUnit.SECONDS
+        return ESTestCase.safeAwait(
+            future -> action.execute(request.createTask(1L, "direct", action.actionName, TaskId.EMPTY_TASK_ID, Map.of()), request, future)
         );
     }
 
@@ -47,11 +44,7 @@ public class ActionTestUtils {
         TransportAction<Request, Response> action,
         Request request
     ) {
-        return PlainActionFuture.get(
-            future -> taskManager.registerAndExecute("transport", action, request, localConnection, future),
-            10,
-            TimeUnit.SECONDS
-        );
+        return ESTestCase.safeAwait(future -> taskManager.registerAndExecute("transport", action, request, localConnection, future));
     }
 
     /**

+ 14 - 14
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -64,7 +64,6 @@ import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Nullable;
@@ -127,6 +126,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
@@ -1628,22 +1628,22 @@ public abstract class EngineTestCase extends ESTestCase {
         throws IOException {
         // This is an adapter between the older synchronous (blocking) code and the newer (async) API. Callers expect exceptions to be
         // thrown directly, so we must undo the layers of wrapping added by future#get and friends.
+        final var future = new PlainActionFuture<Void>();
+        engine.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo, future);
         try {
-            PlainActionFuture.<Void, RuntimeException>get(
-                future -> engine.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo, future),
-                30,
-                TimeUnit.SECONDS
-            );
-        } catch (UncategorizedExecutionException e) {
-            if (e.getCause() instanceof ExecutionException executionException
-                && executionException.getCause() instanceof IOException ioException) {
+            future.get(30, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IOException ioException) {
                 throw ioException;
-            } else {
-                fail(e);
             }
-        } catch (RuntimeException e) {
-            throw e;
-        } catch (Exception e) {
+            if (e.getCause() instanceof RuntimeException runtimeException) {
+                throw runtimeException;
+            }
+            fail(e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            fail(e);
+        } catch (TimeoutException e) {
             fail(e);
         }
     }

+ 16 - 14
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.action.support.UnsafePlainActionFuture;
 import org.elasticsearch.action.support.replication.TransportReplicationAction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -945,19 +946,20 @@ public abstract class IndexShardTestCase extends ESTestCase {
     }
 
     public static Releasable getOperationPermit(final IndexShard shard) {
-        return PlainActionFuture.get(future -> {
-            if (shard.routingEntry().primary()) {
-                shard.acquirePrimaryOperationPermit(future, null);
-            } else {
-                shard.acquireReplicaOperationPermit(
-                    shard.getOperationPrimaryTerm(),
-                    SequenceNumbers.NO_OPS_PERFORMED,
-                    SequenceNumbers.NO_OPS_PERFORMED,
-                    future,
-                    null
-                );
-            }
-        }, 0, TimeUnit.NANOSECONDS);
+        final var listener = new SubscribableListener<Releasable>();
+        if (shard.routingEntry().primary()) {
+            shard.acquirePrimaryOperationPermit(listener, null);
+        } else {
+            shard.acquireReplicaOperationPermit(
+                shard.getOperationPrimaryTerm(),
+                SequenceNumbers.NO_OPS_PERFORMED,
+                SequenceNumbers.NO_OPS_PERFORMED,
+                listener,
+                null
+            );
+        }
+        assertTrue(listener.isDone());
+        return safeAwait(listener);
     }
 
     public static Set<String> getShardDocUIDs(final IndexShard shard) throws IOException {
@@ -1191,6 +1193,6 @@ public abstract class IndexShardTestCase extends ESTestCase {
     }
 
     public static long recoverLocallyUpToGlobalCheckpoint(IndexShard indexShard) {
-        return PlainActionFuture.get(indexShard::recoverLocallyUpToGlobalCheckpoint, 10, TimeUnit.SECONDS);
+        return safeAwait(indexShard::recoverLocallyUpToGlobalCheckpoint);
     }
 }

+ 1 - 5
test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -177,11 +177,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
     }
 
     public static RepositoryData getRepositoryData(Repository repository) {
-        return PlainActionFuture.get(
-            listener -> repository.getRepositoryData(EsExecutors.DIRECT_EXECUTOR_SERVICE, listener),
-            10,
-            TimeUnit.SECONDS
-        );
+        return safeAwait(listener -> repository.getRepositoryData(EsExecutors.DIRECT_EXECUTOR_SERVICE, listener));
     }
 
     public static long getFailureCount(String repository) {

+ 5 - 7
test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

@@ -226,8 +226,8 @@ public class ClusterServiceUtils {
     }
 
     public static void awaitNoPendingTasks(ClusterService clusterService) {
-        PlainActionFuture.<Void, RuntimeException>get(
-            fut -> clusterService.submitUnbatchedStateUpdateTask(
+        ESTestCase.safeAwait(
+            listener -> clusterService.submitUnbatchedStateUpdateTask(
                 "await-queue-empty",
                 new ClusterStateUpdateTask(Priority.LANGUID, TimeValue.timeValueSeconds(10)) {
                     @Override
@@ -237,17 +237,15 @@ public class ClusterServiceUtils {
 
                     @Override
                     public void onFailure(Exception e) {
-                        fut.onFailure(e);
+                        listener.onFailure(e);
                     }
 
                     @Override
                     public void clusterStateProcessed(ClusterState initialState, ClusterState newState) {
-                        fut.onResponse(null);
+                        listener.onResponse(null);
                     }
                 }
-            ),
-            10,
-            TimeUnit.SECONDS
+            )
         );
     }
 

+ 38 - 5
test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

@@ -41,6 +41,7 @@ import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.RequestBuilder;
 import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.PlainActionFuture;
@@ -83,6 +84,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.Booleans;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.CheckedRunnable;
 import org.elasticsearch.core.PathUtils;
 import org.elasticsearch.core.PathUtilsForTesting;
@@ -2290,6 +2292,17 @@ public abstract class ESTestCase extends LuceneTestCase {
         return safeGet(future);
     }
 
+    /**
+     * Call an async action (a {@link Consumer} of an {@link ActionListener}), wait for it to complete the listener, and then return the
+     * result. Preserves the thread's interrupt status flag and converts all exceptions into an {@link AssertionError} to trigger a test
+     * failure.
+     *
+     * @return The value with which the consumed listener was completed.
+     */
+    public static <T> T safeAwait(CheckedConsumer<ActionListener<T>, ?> consumer) {
+        return safeAwait(SubscribableListener.newForked(consumer));
+    }
+
     /**
      * Wait for the successful completion of the given {@link Future}, with a timeout of {@link #SAFE_AWAIT_TIMEOUT}, preserving the
      * thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure.
@@ -2331,11 +2344,31 @@ public abstract class ESTestCase extends LuceneTestCase {
      * @return The exception with which the {@code listener} was completed exceptionally.
      */
     public static Exception safeAwaitFailure(SubscribableListener<?> listener) {
-        return safeAwait(
-            SubscribableListener.newForked(
-                exceptionListener -> listener.addListener(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse))
-            )
-        );
+        return safeAwait(exceptionListener -> listener.addListener(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse)));
+    }
+
+    /**
+     * Wait for the exceptional completion of the given async action, with a timeout of {@link #SAFE_AWAIT_TIMEOUT},
+     * preserving the thread's interrupt status flag and converting a successful completion, interrupt or timeout into an {@link
+     * AssertionError} to trigger a test failure.
+     *
+     * @return The exception with which the {@code listener} was completed exceptionally.
+     */
+    public static <T> Exception safeAwaitFailure(Consumer<ActionListener<T>> consumer) {
+        return safeAwait(exceptionListener -> consumer.accept(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse)));
+    }
+
+    /**
+     * Wait for the exceptional completion of the given async action, with a timeout of {@link #SAFE_AWAIT_TIMEOUT},
+     * preserving the thread's interrupt status flag and converting a successful completion, interrupt or timeout into an {@link
+     * AssertionError} to trigger a test failure.
+     *
+     * @param responseType Class of listener response type, to aid type inference but otherwise ignored.
+     *
+     * @return The exception with which the {@code listener} was completed exceptionally.
+     */
+    public static <T> Exception safeAwaitFailure(@SuppressWarnings("unused") Class<T> responseType, Consumer<ActionListener<T>> consumer) {
+        return safeAwaitFailure(consumer);
     }
 
     /**

+ 12 - 18
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -3211,36 +3211,32 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             channel.sendResponse(TransportResponse.Empty.INSTANCE);
         });
 
-        PlainActionFuture.get(
-            f -> submitRequest(
+        safeAwait(
+            listener -> submitRequest(
                 serviceA,
                 serviceA.getLocalNode(),
                 ACTION,
                 new EmptyRequest(),
                 new ActionListenerResponseHandler<>(
-                    f,
+                    listener,
                     ignored -> TransportResponse.Empty.INSTANCE,
                     TransportResponseHandler.TRANSPORT_WORKER
                 )
-            ),
-            10,
-            TimeUnit.SECONDS
+            )
         );
 
-        PlainActionFuture.get(
-            f -> submitRequest(
+        safeAwait(
+            listener -> submitRequest(
                 serviceA,
                 serviceB.getLocalNode(),
                 ACTION,
                 new EmptyRequest(),
                 new ActionListenerResponseHandler<>(
-                    f,
+                    listener,
                     ignored -> TransportResponse.Empty.INSTANCE,
                     TransportResponseHandler.TRANSPORT_WORKER
                 )
-            ),
-            10,
-            TimeUnit.SECONDS
+            )
         );
     }
 
@@ -3309,16 +3305,14 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         for (int iteration = 1; iteration <= 5; iteration++) {
             assertEquals(
                 responseSize,
-                PlainActionFuture.<Response, Exception>get(
-                    f -> submitRequest(
+                safeAwait(
+                    (ActionListener<Response> listener) -> submitRequest(
                         serviceA,
                         serviceB.getLocalNode(),
                         ACTION,
                         new Request(requestSize),
-                        new ActionListenerResponseHandler<>(f, Response::new, TransportResponseHandler.TRANSPORT_WORKER)
-                    ),
-                    10,
-                    TimeUnit.SECONDS
+                        new ActionListenerResponseHandler<>(listener, Response::new, TransportResponseHandler.TRANSPORT_WORKER)
+                    )
                 ).refSize
             );
 

+ 31 - 17
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -14,6 +14,9 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.RemoteClusterActionType;
 import org.elasticsearch.action.SingleResultDeduplicator;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
@@ -252,15 +255,30 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         }
     }
 
+    private <Request extends ActionRequest, Response extends ActionResponse> Response executeRecoveryAction(
+        RemoteClusterClient client,
+        RemoteClusterActionType<Response> action,
+        Request request
+    ) {
+        final var future = new PlainActionFuture<Response>();
+        client.execute(action, request, future);
+        // TODO stop doing this as a blocking activity
+        // TODO on timeout, cancel the remote request, don't just carry on
+        // TODO handle exceptions better, don't just unwrap/rewrap them with actionGet
+        return future.actionGet(ccrSettings.getRecoveryActionTimeout().millis(), TimeUnit.MILLISECONDS);
+    }
+
     @Override
     public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
         assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
         var remoteClient = getRemoteClusterClient();
-        // We set a single dummy index name to avoid fetching all the index data
-        ClusterStateResponse clusterState = PlainActionFuture.get(
-            f -> remoteClient.execute(ClusterStateAction.REMOTE_TYPE, CcrRequests.metadataRequest("dummy_index_name"), f),
-            ccrSettings.getRecoveryActionTimeout().millis(),
-            TimeUnit.MILLISECONDS
+        ClusterStateResponse clusterState = executeRecoveryAction(
+            remoteClient,
+            ClusterStateAction.REMOTE_TYPE,
+            CcrRequests.metadataRequest(
+                // We set a single dummy index name to avoid fetching all the index data
+                "dummy_index_name"
+            )
         );
         return clusterState.getState().metadata();
     }
@@ -271,10 +289,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         String leaderIndex = index.getName();
         var remoteClient = getRemoteClusterClient();
 
-        ClusterStateResponse clusterState = PlainActionFuture.get(
-            f -> remoteClient.execute(ClusterStateAction.REMOTE_TYPE, CcrRequests.metadataRequest(leaderIndex), f),
-            ccrSettings.getRecoveryActionTimeout().millis(),
-            TimeUnit.MILLISECONDS
+        ClusterStateResponse clusterState = executeRecoveryAction(
+            remoteClient,
+            ClusterStateAction.REMOTE_TYPE,
+            CcrRequests.metadataRequest(leaderIndex)
         );
 
         // Validates whether the leader cluster has been configured properly:
@@ -556,14 +574,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
     public IndexShardSnapshotStatus.Copy getShardSnapshotStatus(SnapshotId snapshotId, IndexId index, ShardId shardId) {
         assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
         final String leaderIndex = index.getName();
-        final IndicesStatsResponse response = PlainActionFuture.get(
-            f -> getRemoteClusterClient().execute(
-                IndicesStatsAction.REMOTE_TYPE,
-                new IndicesStatsRequest().indices(leaderIndex).clear().store(true),
-                f
-            ),
-            ccrSettings.getRecoveryActionTimeout().millis(),
-            TimeUnit.MILLISECONDS
+        final IndicesStatsResponse response = executeRecoveryAction(
+            getRemoteClusterClient(),
+            IndicesStatsAction.REMOTE_TYPE,
+            new IndicesStatsRequest().indices(leaderIndex).clear().store(true)
         );
         for (ShardStats shardStats : response.getIndex(leaderIndex).getShards()) {
             final ShardRouting shardRouting = shardStats.getShardRouting();

+ 13 - 8
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java

@@ -124,9 +124,14 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
         );
 
         final MonitoringBulkRequest request = randomRequest();
-        final ClusterBlockException e = expectThrows(ClusterBlockException.class, () -> ActionTestUtils.executeBlocking(action, request));
 
-        assertThat(e, hasToString(containsString("ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/2/no master]")));
+        assertThat(
+            asInstanceOf(
+                ClusterBlockException.class,
+                safeAwaitFailure(MonitoringBulkResponse.class, l -> action.execute(null, request, l))
+            ),
+            hasToString(containsString("ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/2/no master]"))
+        );
     }
 
     public void testExecuteIgnoresRequestWhenCollectionIsDisabled() throws Exception {
@@ -169,13 +174,13 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
             monitoringService
         );
 
-        final MonitoringBulkRequest request = new MonitoringBulkRequest();
-        final ActionRequestValidationException e = expectThrows(
-            ActionRequestValidationException.class,
-            () -> ActionTestUtils.executeBlocking(action, request)
+        assertThat(
+            asInstanceOf(
+                ActionRequestValidationException.class,
+                safeAwaitFailure(MonitoringBulkResponse.class, l -> action.execute(null, new MonitoringBulkRequest(), l))
+            ),
+            hasToString(containsString("no monitoring documents added"))
         );
-
-        assertThat(e, hasToString(containsString("no monitoring documents added")));
     }
 
     @SuppressWarnings("unchecked")