Jelajahi Sumber

Migrate SnapshotResiliencyTests to SubscribableListener (#102668)

There's no need for the exception-mangling behaviour of
`ListenableFuture` in these tests, and if we use `SubscribableListener`
then we'll be able to use various other useful tools in future work.
David Turner 1 tahun lalu
induk
melakukan
9551fcef40

+ 62 - 57
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -70,6 +70,7 @@ import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.DestructiveOperations;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -128,7 +129,6 @@ import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
-import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.env.Environment;
@@ -246,8 +246,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
         try {
             clearDisruptionsAndAwaitSync();
 
-            final ListenableFuture<CleanupRepositoryResponse> cleanupResponse = new ListenableFuture<>();
-            final ListenableFuture<CreateSnapshotResponse> createSnapshotResponse = new ListenableFuture<>();
+            final SubscribableListener<CleanupRepositoryResponse> cleanupResponse = new SubscribableListener<>();
+            final SubscribableListener<CreateSnapshotResponse> createSnapshotResponse = new SubscribableListener<>();
             // Create another snapshot and then clean up the repository to verify that the repository works correctly no matter the
             // failures seen during the previous test.
             client().admin()
@@ -292,7 +292,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createSnapshotResponseListener = new SubscribableListener<>();
 
         continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> {
             final Runnable afterIndexing = () -> client().admin()
@@ -307,7 +307,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 for (int i = 0; i < documents; ++i) {
                     bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
                 }
-                final ListenableFuture<BulkResponse> bulkResponseStepListener = new ListenableFuture<>();
+                final SubscribableListener<BulkResponse> bulkResponseStepListener = new SubscribableListener<>();
                 client().bulk(bulkRequest, bulkResponseStepListener);
                 continueOrDie(bulkResponseStepListener, bulkResponse -> {
                     assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
@@ -317,14 +317,14 @@ public class SnapshotResiliencyTests extends ESTestCase {
             }
         });
 
-        final ListenableFuture<AcknowledgedResponse> deleteIndexListener = new ListenableFuture<>();
+        final SubscribableListener<AcknowledgedResponse> deleteIndexListener = new SubscribableListener<>();
 
         continueOrDie(
             createSnapshotResponseListener,
             createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener)
         );
 
-        final ListenableFuture<RestoreSnapshotResponse> restoreSnapshotResponseListener = new ListenableFuture<>();
+        final SubscribableListener<RestoreSnapshotResponse> restoreSnapshotResponseListener = new SubscribableListener<>();
         continueOrDie(
             deleteIndexListener,
             ignored -> client().admin()
@@ -335,7 +335,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 )
         );
 
-        final ListenableFuture<SearchResponse> searchResponseListener = new ListenableFuture<>();
+        final SubscribableListener<SearchResponse> searchResponseListener = new SubscribableListener<>();
         continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> {
             assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
             client().search(
@@ -351,8 +351,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
         });
 
         runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L));
-        assertNotNull(createSnapshotResponseListener.result());
-        assertNotNull(restoreSnapshotResponseListener.result());
+        assertNotNull(safeResult(createSnapshotResponseListener));
+        assertNotNull(safeResult(restoreSnapshotResponseListener));
         assertTrue(documentCountVerified.get());
         assertTrue(SnapshotsInProgress.get(masterNode.clusterService.state()).isEmpty());
         final Repository repository = masterNode.repositoriesService.repository(repoName);
@@ -367,10 +367,10 @@ public class SnapshotResiliencyTests extends ESTestCase {
     }
 
     private SnapshotInfo getSnapshotInfo(Repository repository, SnapshotId snapshotId) {
-        final ListenableFuture<SnapshotInfo> listener = new ListenableFuture<>();
+        final SubscribableListener<SnapshotInfo> listener = new SubscribableListener<>();
         repository.getSnapshotInfo(snapshotId, listener);
         deterministicTaskQueue.runAllRunnableTasks();
-        return listener.result();
+        return safeResult(listener);
     }
 
     public void testSnapshotWithNodeDisconnects() {
@@ -383,7 +383,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         final String index = "test";
         final int shards = randomIntBetween(1, 10);
 
-        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new SubscribableListener<>();
 
         final boolean partial = randomBoolean();
         continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> {
@@ -461,7 +461,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         final int shards = randomIntBetween(1, 10);
 
         final boolean waitForSnapshot = randomBoolean();
-        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new SubscribableListener<>();
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
             createIndexResponse -> testClusterNodes.randomMasterNodeSafe().client.admin()
@@ -512,7 +512,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
@@ -522,7 +522,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 .execute(createSnapshotResponseStepListener)
         );
 
-        final ListenableFuture<AcknowledgedResponse> deleteSnapshotStepListener = new ListenableFuture<>();
+        final SubscribableListener<AcknowledgedResponse> deleteSnapshotStepListener = new SubscribableListener<>();
 
         masterNode.clusterService.addListener(new ClusterStateListener() {
             @Override
@@ -534,7 +534,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             }
         });
 
-        final ListenableFuture<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(
             deleteSnapshotStepListener,
@@ -551,8 +551,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
 
         deterministicTaskQueue.runAllRunnableTasks();
 
-        assertNotNull(createSnapshotResponseStepListener.result());
-        assertNotNull(createAnotherSnapshotResponseStepListener.result());
+        assertNotNull(safeResult(createSnapshotResponseStepListener));
+        assertNotNull(safeResult(createAnotherSnapshotResponseStepListener));
         assertTrue(masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).isEmpty());
         final Repository repository = masterNode.repositoriesService.repository(repoName);
         Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
@@ -577,7 +577,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
@@ -588,7 +588,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 .execute(createSnapshotResponseStepListener)
         );
 
-        final ListenableFuture<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(
             createSnapshotResponseStepListener,
@@ -598,7 +598,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 .execute(createOtherSnapshotResponseStepListener)
         );
 
-        final ListenableFuture<AcknowledgedResponse> deleteSnapshotStepListener = new ListenableFuture<>();
+        final SubscribableListener<AcknowledgedResponse> deleteSnapshotStepListener = new SubscribableListener<>();
 
         continueOrDie(
             createOtherSnapshotResponseStepListener,
@@ -608,7 +608,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 .execute(deleteSnapshotStepListener)
         );
 
-        final ListenableFuture<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(deleteSnapshotStepListener, deleted -> {
             client().admin()
@@ -651,7 +651,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
@@ -663,7 +663,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         );
 
         final int inProgressSnapshots = randomIntBetween(1, 5);
-        final ListenableFuture<Collection<CreateSnapshotResponse>> createOtherSnapshotResponseStepListener = new ListenableFuture<>();
+        final var createOtherSnapshotResponseStepListener = new SubscribableListener<Collection<CreateSnapshotResponse>>();
         final ActionListener<CreateSnapshotResponse> createSnapshotListener = new GroupedActionListener<>(
             inProgressSnapshots,
             createOtherSnapshotResponseStepListener
@@ -675,7 +675,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             }
         });
 
-        final ListenableFuture<AcknowledgedResponse> deleteSnapshotStepListener = new ListenableFuture<>();
+        final SubscribableListener<AcknowledgedResponse> deleteSnapshotStepListener = new SubscribableListener<>();
 
         continueOrDie(
             createOtherSnapshotResponseStepListener,
@@ -705,7 +705,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new SubscribableListener<>();
 
         final int documentsFirstSnapshot = randomIntBetween(0, 100);
 
@@ -724,7 +724,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
 
         final int documentsSecondSnapshot = randomIntBetween(0, 100);
 
-        final ListenableFuture<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new SubscribableListener<>();
 
         final String secondSnapshotName = "snapshot-2";
         continueOrDie(
@@ -740,8 +740,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
             )
         );
 
-        final ListenableFuture<AcknowledgedResponse> deleteSnapshotStepListener = new ListenableFuture<>();
-        final ListenableFuture<RestoreSnapshotResponse> restoreSnapshotResponseListener = new ListenableFuture<>();
+        final SubscribableListener<AcknowledgedResponse> deleteSnapshotStepListener = new SubscribableListener<>();
+        final SubscribableListener<RestoreSnapshotResponse> restoreSnapshotResponseListener = new SubscribableListener<>();
 
         continueOrDie(createOtherSnapshotResponseStepListener, createSnapshotResponse -> {
             scheduleNow(() -> client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener));
@@ -757,7 +757,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             );
         });
 
-        final ListenableFuture<SearchResponse> searchResponseListener = new ListenableFuture<>();
+        final SubscribableListener<SearchResponse> searchResponseListener = new SubscribableListener<>();
         continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> {
             assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
             client().search(
@@ -770,14 +770,14 @@ public class SnapshotResiliencyTests extends ESTestCase {
 
         assertEquals(
             documentsFirstSnapshot + documentsSecondSnapshot,
-            Objects.requireNonNull(searchResponseListener.result().getHits().getTotalHits()).value
+            Objects.requireNonNull(safeResult(searchResponseListener).getHits().getTotalHits()).value
         );
-        assertThat(deleteSnapshotStepListener.result().isAcknowledged(), is(true));
-        assertThat(restoreSnapshotResponseListener.result().getRestoreInfo().failedShards(), is(0));
+        assertThat(safeResult(deleteSnapshotStepListener).isAcknowledged(), is(true));
+        assertThat(safeResult(restoreSnapshotResponseListener).getRestoreInfo().failedShards(), is(0));
 
         final Repository repository = masterNode.repositoriesService.repository(repoName);
         Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
-        assertThat(snapshotIds, contains(createOtherSnapshotResponseStepListener.result().getSnapshotInfo().snapshotId()));
+        assertThat(snapshotIds, contains(safeResult(createOtherSnapshotResponseStepListener).getSnapshotInfo().snapshotId()));
 
         for (SnapshotId snapshotId : snapshotIds) {
             final SnapshotInfo snapshotInfo = getSnapshotInfo(repository, snapshotId);
@@ -797,7 +797,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         for (int i = 0; i < documents; ++i) {
             bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
         }
-        final ListenableFuture<BulkResponse> bulkResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<BulkResponse> bulkResponseStepListener = new SubscribableListener<>();
         client().bulk(bulkRequest, bulkResponseStepListener);
         continueOrDie(bulkResponseStepListener, bulkResponse -> {
             assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
@@ -817,7 +817,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final ListenableFuture<Collection<CreateIndexResponse>> createIndicesListener = new ListenableFuture<>();
+        final SubscribableListener<Collection<CreateIndexResponse>> createIndicesListener = new SubscribableListener<>();
         final int indices = randomIntBetween(5, 20);
 
         final SetOnce<Index> firstIndex = new SetOnce<>();
@@ -831,7 +831,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             }
         });
 
-        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new SubscribableListener<>();
 
         final boolean partialSnapshot = randomBoolean();
 
@@ -908,7 +908,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
@@ -919,15 +919,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 .execute(createSnapshotResponseStepListener)
         );
 
-        final Collection<ListenableFuture<Boolean>> deleteSnapshotStepListeners = List.of(
-            new ListenableFuture<>(),
-            new ListenableFuture<>()
+        final Collection<SubscribableListener<Boolean>> deleteSnapshotStepListeners = List.of(
+            new SubscribableListener<>(),
+            new SubscribableListener<>()
         );
 
         final AtomicInteger successfulDeletes = new AtomicInteger(0);
 
         continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
-            for (ListenableFuture<Boolean> deleteListener : deleteSnapshotStepListeners) {
+            for (SubscribableListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
                 client().admin()
                     .cluster()
                     .prepareDeleteSnapshot(repoName, snapshotName)
@@ -943,7 +943,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             }
         });
 
-        for (ListenableFuture<Boolean> deleteListener : deleteSnapshotStepListeners) {
+        for (SubscribableListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
             continueOrDie(deleteListener, deleted -> {
                 if (deleted) {
                     successfulDeletes.incrementAndGet();
@@ -984,7 +984,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         final AtomicBoolean createdSnapshot = new AtomicBoolean();
         final AdminClient masterAdminClient = masterNode.client.admin();
 
-        final ListenableFuture<ClusterStateResponse> clusterStateResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<ClusterStateResponse> clusterStateResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
@@ -999,7 +999,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             scheduleNow(new Runnable() {
                 @Override
                 public void run() {
-                    final ListenableFuture<ClusterStateResponse> updatedClusterStateResponseStepListener = new ListenableFuture<>();
+                    final SubscribableListener<ClusterStateResponse> updatedClusterStateResponseStepListener = new SubscribableListener<>();
                     masterAdminClient.cluster().state(new ClusterStateRequest(), updatedClusterStateResponseStepListener);
                     continueOrDie(updatedClusterStateResponseStepListener, updatedClusterState -> {
                         final ShardRouting shardRouting = updatedClusterState.getState()
@@ -1071,7 +1071,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> {
             final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false);
@@ -1096,7 +1096,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
 
         final String restoredIndex = "restored";
 
-        final ListenableFuture<RestoreSnapshotResponse> restoreSnapshotResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<RestoreSnapshotResponse> restoreSnapshotResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(
             createSnapshotResponseStepListener,
@@ -1110,7 +1110,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 )
         );
 
-        final ListenableFuture<SearchResponse> searchResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<SearchResponse> searchResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(restoreSnapshotResponseStepListener, restoreSnapshotResponse -> {
             assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
@@ -1141,8 +1141,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
 
         runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L));
 
-        assertNotNull(createSnapshotResponseStepListener.result());
-        assertNotNull(restoreSnapshotResponseStepListener.result());
+        assertNotNull(safeResult(createSnapshotResponseStepListener));
+        assertNotNull(safeResult(restoreSnapshotResponseStepListener));
         assertTrue(masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).isEmpty());
         final Repository repository = masterNode.repositoriesService.repository(repoName);
         Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
@@ -1168,7 +1168,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final ListenableFuture<Collection<CreateSnapshotResponse>> allSnapshotsListener = new ListenableFuture<>();
+        final SubscribableListener<Collection<CreateSnapshotResponse>> allSnapshotsListener = new SubscribableListener<>();
         final ActionListener<CreateSnapshotResponse> snapshotListener = new GroupedActionListener<>(
             snapshotNames.size(),
             allSnapshotsListener
@@ -1188,7 +1188,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             for (int i = 0; i < documents; ++i) {
                 bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
             }
-            final ListenableFuture<BulkResponse> bulkResponseStepListener = new ListenableFuture<>();
+            final SubscribableListener<BulkResponse> bulkResponseStepListener = new SubscribableListener<>();
             client().bulk(bulkRequest, bulkResponseStepListener);
             continueOrDie(bulkResponseStepListener, bulkResponse -> {
                 assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
@@ -1229,8 +1229,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
         return res.actionGet();
     }
 
-    private ListenableFuture<CreateIndexResponse> createRepoAndIndex(String repoName, String index, int shards) {
-        final ListenableFuture<AcknowledgedResponse> createRepositoryListener = new ListenableFuture<>();
+    private SubscribableListener<CreateIndexResponse> createRepoAndIndex(String repoName, String index, int shards) {
+        final SubscribableListener<AcknowledgedResponse> createRepositoryListener = new SubscribableListener<>();
 
         client().admin()
             .cluster()
@@ -1239,7 +1239,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             .setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
             .execute(createRepositoryListener);
 
-        final ListenableFuture<CreateIndexResponse> createIndexResponseStepListener = new ListenableFuture<>();
+        final SubscribableListener<CreateIndexResponse> createIndexResponseStepListener = new SubscribableListener<>();
 
         continueOrDie(
             createRepositoryListener,
@@ -1368,7 +1368,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             .build();
     }
 
-    private static <T> void continueOrDie(ListenableFuture<T> listener, CheckedConsumer<T, Exception> onResponse) {
+    private static <T> void continueOrDie(SubscribableListener<T> listener, CheckedConsumer<T, Exception> onResponse) {
         listener.addListener(ActionTestUtils.assertNoFailureListener(onResponse));
     }
 
@@ -2213,4 +2213,9 @@ public class SnapshotResiliencyTests extends ESTestCase {
             }
         }
     }
+
+    private static <T> T safeResult(SubscribableListener<T> listener) {
+        assertTrue("listener is not complete", listener.isDone());
+        return safeAwait(listener);
+    }
 }