Parcourir la source

Remove StepListener (#96627)

At this point all use of StepListener has become redundant.
It was only used to provide the `whenComplete` short-hand that I replaced
by wrapped listeners here.
The atomic boolean completion flag did not do anything funtionally since
`ListenableFuture` guards against double-invocation as well.

-> use `ListenableFuture` throughout to save on code and indirection.
Armin Braun il y a 2 ans
Parent
commit
6d72e5c5d0
27 fichiers modifiés avec 570 ajouts et 739 suppressions
  1. 4 4
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java
  2. 7 7
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStressTestsIT.java
  3. 0 96
      server/src/main/java/org/elasticsearch/action/StepListener.java
  4. 9 9
      server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java
  5. 22 21
      server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
  6. 6 6
      server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java
  7. 23 21
      server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
  8. 2 2
      server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java
  9. 7 7
      server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java
  10. 5 5
      server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
  11. 89 83
      server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
  12. 15 15
      server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java
  13. 36 23
      server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
  14. 74 70
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  15. 14 12
      server/src/main/java/org/elasticsearch/rest/action/cat/RestTemplatesAction.java
  16. 36 26
      server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
  17. 91 82
      server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
  18. 3 3
      server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java
  19. 10 10
      server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java
  20. 0 136
      server/src/test/java/org/elasticsearch/action/StepListenerTests.java
  21. 14 14
      server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java
  22. 2 2
      server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
  23. 50 47
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  24. 3 3
      test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java
  25. 17 17
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsAction.java
  26. 4 4
      x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexEventListener.java
  27. 27 14
      x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/BlobAnalyzeAction.java

+ 4 - 4
server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

@@ -12,7 +12,6 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
@@ -25,6 +24,7 @@ import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
 import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
 import org.elasticsearch.core.PathUtils;
 import org.elasticsearch.discovery.AbstractDisruptionTestCase;
@@ -190,14 +190,14 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
             dataNode
         );
 
-        final Collection<StepListener<AcknowledgedResponse>> deleteFutures = new ArrayList<>();
+        final Collection<ListenableFuture<AcknowledgedResponse>> deleteFutures = new ArrayList<>();
         while (snapshotNames.isEmpty() == false) {
             final Collection<String> toDelete = randomSubsetOf(snapshotNames);
             if (toDelete.isEmpty()) {
                 continue;
             }
             snapshotNames.removeAll(toDelete);
-            final StepListener<AcknowledgedResponse> future = new StepListener<>();
+            final ListenableFuture<AcknowledgedResponse> future = new ListenableFuture<>();
             clusterAdmin().prepareDeleteSnapshot(repoName, toDelete.toArray(Strings.EMPTY_ARRAY)).execute(future);
             deleteFutures.add(future);
         }
@@ -213,7 +213,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
         logger.info("--> waiting for batched deletes to finish");
         final PlainActionFuture<Collection<AcknowledgedResponse>> allDeletesDone = new PlainActionFuture<>();
         final ActionListener<AcknowledgedResponse> deletesListener = new GroupedActionListener<>(deleteFutures.size(), allDeletesDone);
-        for (StepListener<AcknowledgedResponse> deleteFuture : deleteFutures) {
+        for (ListenableFuture<AcknowledgedResponse> deleteFuture : deleteFutures) {
             deleteFuture.addListener(deletesListener);
         }
         allDeletesDone.get();

+ 7 - 7
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStressTestsIT.java

@@ -14,7 +14,6 @@ import org.apache.lucene.tests.util.LuceneTestCase;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ShardOperationFailedException;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
@@ -38,6 +37,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.CheckedRunnable;
 import org.elasticsearch.core.Nullable;
@@ -496,8 +496,8 @@ public class SnapshotStressTestsIT extends AbstractSnapshotIntegTestCase {
                 final String[] indicesToClose = indicesToCloseList.toArray(new String[0]);
                 final String[] indicesToDelete = indicesToDeleteList.toArray(new String[0]);
 
-                final StepListener<Void> closeIndicesStep = new StepListener<>();
-                final StepListener<Void> deleteIndicesStep = new StepListener<>();
+                final ListenableFuture<Void> closeIndicesStep = new ListenableFuture<>();
+                final ListenableFuture<Void> deleteIndicesStep = new ListenableFuture<>();
 
                 if (indicesToClose.length > 0) {
                     logger.info(
@@ -623,7 +623,7 @@ public class SnapshotStressTestsIT extends AbstractSnapshotIntegTestCase {
 
                     final Releasable releaseAll = localReleasables.transfer();
 
-                    final StepListener<List<String>> getIndicesStep = new StepListener<>();
+                    final ListenableFuture<List<String>> getIndicesStep = new ListenableFuture<>();
 
                     logger.info(
                         "--> listing indices in [{}:{}] in preparation for cloning",
@@ -869,7 +869,7 @@ public class SnapshotStressTestsIT extends AbstractSnapshotIntegTestCase {
 
                     final Releasable releaseAll = localReleasables.transfer();
 
-                    final StepListener<ClusterHealthResponse> ensureYellowStep = new StepListener<>();
+                    final ListenableFuture<ClusterHealthResponse> ensureYellowStep = new ListenableFuture<>();
 
                     final String snapshotName = "snapshot-" + snapshotCounter.incrementAndGet();
 
@@ -1336,13 +1336,13 @@ public class SnapshotStressTestsIT extends AbstractSnapshotIntegTestCase {
 
                             final Releasable releaseAll = localReleasables.transfer();
 
-                            final StepListener<ClusterHealthResponse> ensureYellowStep = new StepListener<>();
+                            final ListenableFuture<ClusterHealthResponse> ensureYellowStep = new ListenableFuture<>();
 
                             logger.info("--> waiting for yellow health of [{}] prior to indexing [{}] docs", indexName, docCount);
 
                             prepareClusterHealthRequest(indexName).setWaitForYellowStatus().execute(ensureYellowStep);
 
-                            final StepListener<BulkResponse> bulkStep = new StepListener<>();
+                            final ListenableFuture<BulkResponse> bulkStep = new ListenableFuture<>();
 
                             ensureYellowStep.addListener(mustSucceed(clusterHealthResponse -> {
 

+ 0 - 96
server/src/main/java/org/elasticsearch/action/StepListener.java

@@ -1,96 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-package org.elasticsearch.action;
-
-import org.elasticsearch.common.util.concurrent.ListenableFuture;
-import org.elasticsearch.core.CheckedConsumer;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
-
-/**
- * A {@link StepListener} provides a simple way to write a flow consisting of
- * multiple asynchronous steps without having nested callbacks. For example:
- *
- * <pre>{@code
- *  void asyncFlowMethod(... ActionListener<R> flowListener) {
- *    StepListener<R1> step1 = new StepListener<>();
- *    asyncStep1(..., step1);
-
- *    StepListener<R2> step2 = new StepListener<>();
- *    step1.whenComplete(r1 -> {
- *      asyncStep2(r1, ..., step2);
- *    }, flowListener::onFailure);
- *
- *    step2.whenComplete(r2 -> {
- *      R1 r1 = step1.result();
- *      R r = combine(r1, r2);
- *     flowListener.onResponse(r);
- *    }, flowListener::onFailure);
- *  }
- * }</pre>
- */
-
-public final class StepListener<Response> implements ActionListener<Response> {
-
-    private final AtomicBoolean hasBeenCalled = new AtomicBoolean(false);
-    private final ListenableFuture<Response> delegate;
-
-    public StepListener() {
-        this.delegate = new ListenableFuture<>();
-    }
-
-    @Override
-    public void onResponse(Response response) {
-        if (hasBeenCalled.compareAndSet(false, true)) {
-            delegate.onResponse(response);
-        }
-    }
-
-    @Override
-    public void onFailure(Exception e) {
-        if (hasBeenCalled.compareAndSet(false, true)) {
-            delegate.onFailure(e);
-        }
-    }
-
-    /**
-     * Registers the given actions which are called when this step is completed. If this step is completed successfully,
-     * the {@code onResponse} is called with the result; otherwise the {@code onFailure} is called with the failure.
-     *
-     * @param onResponse is called when this step is completed successfully
-     * @param onFailure  is called when this step is completed with a failure
-     */
-    public void whenComplete(CheckedConsumer<Response, Exception> onResponse, Consumer<Exception> onFailure) {
-        addListener(ActionListener.wrap(onResponse, onFailure));
-    }
-
-    /**
-     * @return the result of this step, if it has been completed successfully, or throw the exception with which it was completed
-     * exceptionally. It is not valid to call this method if the step is incomplete.
-     */
-    public Response result() {
-        return delegate.result();
-    }
-
-    /**
-     * @return whether this step is complete yet.
-     */
-    public boolean isDone() {
-        return delegate.isDone();
-    }
-
-    /**
-     * Registers the given listener to be notified with the result of this step.
-     */
-    public void addListener(ActionListener<Response> listener) {
-        delegate.addListener(listener);
-    }
-
-}

+ 9 - 9
server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

@@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.ClusterState;
@@ -25,6 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.repositories.RepositoriesService;
@@ -153,9 +153,9 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
             return;
         }
         final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
-        final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
+        final ListenableFuture<RepositoryData> repositoryDataListener = new ListenableFuture<>();
         repository.getRepositoryData(repositoryDataListener);
-        repositoryDataListener.whenComplete(repositoryData -> {
+        repositoryDataListener.addListener(listener.delegateFailureAndWrap((delegate, repositoryData) -> {
             final long repositoryStateId = repositoryData.getGenId();
             logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
             submitUnbatchedTask(
@@ -222,7 +222,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
                         threadPool.executor(ThreadPool.Names.SNAPSHOT)
                             .execute(
                                 ActionRunnable.wrap(
-                                    listener,
+                                    delegate,
                                     l -> blobStoreRepository.cleanup(
                                         repositoryStateId,
                                         SnapshotsService.minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null),
@@ -248,7 +248,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
                         assert failure != null || result != null;
                         if (startedCleanup == false) {
                             logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
-                            listener.onFailure(failure);
+                            delegate.onFailure(failure);
                             return;
                         }
                         submitUnbatchedTask(
@@ -266,7 +266,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
                                         e.addSuppressed(failure);
                                     }
                                     logger.warn(() -> "[" + repositoryName + "] failed to remove repository cleanup task", e);
-                                    listener.onFailure(e);
+                                    delegate.onFailure(e);
                                 }
 
                                 @Override
@@ -278,7 +278,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
                                             repositoryStateId,
                                             result
                                         );
-                                        listener.onResponse(result);
+                                        delegate.onResponse(result);
                                     } else {
                                         logger.warn(
                                             () -> "Failed to run repository cleanup operations on ["
@@ -288,7 +288,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
                                                 + "]",
                                             failure
                                         );
-                                        listener.onFailure(failure);
+                                        delegate.onFailure(failure);
                                     }
                                 }
                             }
@@ -296,7 +296,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
                     }
                 }
             );
-        }, listener::onFailure);
+        }));
     }
 
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here

+ 22 - 21
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java

@@ -10,7 +10,6 @@ package org.elasticsearch.action.admin.cluster.snapshots.get;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.repositories.get.TransportGetRepositoriesAction;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.RefCountingListener;
@@ -26,6 +25,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.repositories.GetSnapshotInfoContext;
 import org.elasticsearch.repositories.IndexId;
@@ -254,32 +254,33 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
             currentSnapshots.add(snapshotInfo.maybeWithoutIndices(indices));
         }
 
-        final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
+        final ListenableFuture<RepositoryData> repositoryDataListener = new ListenableFuture<>();
         if (isCurrentSnapshotsOnly(snapshots)) {
             repositoryDataListener.onResponse(null);
         } else {
             repositoriesService.getRepositoryData(repo, repositoryDataListener);
         }
 
-        repositoryDataListener.whenComplete(
-            repositoryData -> loadSnapshotInfos(
-                snapshotsInProgress,
-                repo,
-                snapshots,
-                ignoreUnavailable,
-                verbose,
-                allSnapshotIds,
-                currentSnapshots,
-                repositoryData,
-                task,
-                sortBy,
-                after,
-                order,
-                predicates,
-                indices,
-                listener
-            ),
-            listener::onFailure
+        repositoryDataListener.addListener(
+            listener.delegateFailureAndWrap(
+                (l, repositoryData) -> loadSnapshotInfos(
+                    snapshotsInProgress,
+                    repo,
+                    snapshots,
+                    ignoreUnavailable,
+                    verbose,
+                    allSnapshotIds,
+                    currentSnapshots,
+                    repositoryData,
+                    task,
+                    sortBy,
+                    after,
+                    order,
+                    predicates,
+                    indices,
+                    l
+                )
+            )
         );
     }
 

+ 6 - 6
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

@@ -11,7 +11,6 @@ package org.elasticsearch.action.admin.cluster.snapshots.status;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@@ -26,6 +25,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@@ -280,10 +280,10 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
         ActionListener<SnapshotsStatusResponse> listener
     ) {
         final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
-        final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
+        final ListenableFuture<RepositoryData> repositoryDataListener = new ListenableFuture<>();
         repositoriesService.getRepositoryData(repositoryName, repositoryDataListener);
         final Collection<SnapshotId> snapshotIdsToLoad = new ArrayList<>();
-        repositoryDataListener.whenComplete(repositoryData -> {
+        repositoryDataListener.addListener(listener.delegateFailureAndWrap((delegate, repositoryData) -> {
             task.ensureNotCancelled();
             final Map<String, SnapshotId> matchedSnapshotIds = repositoryData.getSnapshotIds()
                 .stream()
@@ -315,7 +315,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
             }
 
             if (snapshotIdsToLoad.isEmpty()) {
-                listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
+                delegate.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
             } else {
                 final List<SnapshotStatus> threadSafeBuilder = Collections.synchronizedList(builder);
                 repositoriesService.repository(repositoryName)
@@ -356,9 +356,9 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
                                 (endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime
                             )
                         );
-                    }, listener.map(v -> new SnapshotsStatusResponse(List.copyOf(threadSafeBuilder)))));
+                    }, delegate.map(v -> new SnapshotsStatusResponse(List.copyOf(threadSafeBuilder)))));
             }
-        }, listener::onFailure);
+        }));
     }
 
     /**

+ 23 - 21
server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

@@ -11,7 +11,6 @@ package org.elasticsearch.action.admin.cluster.stats;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.FailedNodeException;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
 import org.elasticsearch.action.admin.indices.stats.CommonStats;
@@ -30,6 +29,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.CancellableSingleObjectCache;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.engine.CommitStats;
@@ -128,29 +128,31 @@ public class TransportClusterStatsAction extends TransportNodesAction<
             clusterService.threadPool().absoluteTimeInMillis()
         );
 
-        final StepListener<MappingStats> mappingStatsStep = new StepListener<>();
-        final StepListener<AnalysisStats> analysisStatsStep = new StepListener<>();
+        final ListenableFuture<MappingStats> mappingStatsStep = new ListenableFuture<>();
+        final ListenableFuture<AnalysisStats> analysisStatsStep = new ListenableFuture<>();
         mappingStatsCache.get(metadata, cancellableTask::isCancelled, mappingStatsStep);
         analysisStatsCache.get(metadata, cancellableTask::isCancelled, analysisStatsStep);
-        mappingStatsStep.whenComplete(
-            mappingStats -> analysisStatsStep.whenComplete(
-                analysisStats -> ActionListener.completeWith(
-                    listener,
-                    () -> new ClusterStatsResponse(
-                        System.currentTimeMillis(),
-                        metadata.clusterUUID(),
-                        clusterService.getClusterName(),
-                        responses,
-                        failures,
-                        mappingStats,
-                        analysisStats,
-                        VersionStats.of(metadata, responses),
-                        clusterSnapshotStats
+        mappingStatsStep.addListener(
+            listener.delegateFailureAndWrap(
+                (l, mappingStats) -> analysisStatsStep.addListener(
+                    l.delegateFailureAndWrap(
+                        (ll, analysisStats) -> ActionListener.completeWith(
+                            ll,
+                            () -> new ClusterStatsResponse(
+                                System.currentTimeMillis(),
+                                metadata.clusterUUID(),
+                                clusterService.getClusterName(),
+                                responses,
+                                failures,
+                                mappingStats,
+                                analysisStats,
+                                VersionStats.of(metadata, responses),
+                                clusterSnapshotStats
+                            )
+                        )
                     )
-                ),
-                listener::onFailure
-            ),
-            listener::onFailure
+                )
+            )
         );
     }
 

+ 2 - 2
server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java

@@ -9,11 +9,11 @@ package org.elasticsearch.action.search;
 
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.RefCountingRunnable;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportResponse;
 
@@ -156,7 +156,7 @@ public final class ClearScrollController implements Runnable {
             .map(SearchContextIdForNode::getClusterAlias)
             .filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false)
             .collect(Collectors.toSet());
-        final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = new StepListener<>();
+        final ListenableFuture<BiFunction<String, String, DiscoveryNode>> lookupListener = new ListenableFuture<>();
         if (clusters.isEmpty()) {
             lookupListener.onResponse((cluster, nodeId) -> nodes.get(nodeId));
         } else {

+ 7 - 7
server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java

@@ -16,7 +16,6 @@ import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction;
 import org.elasticsearch.action.admin.cluster.coordination.CoordinationDiagnosticsAction;
 import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -30,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
@@ -1093,10 +1093,10 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
         ActionRequest transportActionRequest,
         BiFunction<R, Exception, T> responseTransformationFunction
     ) {
-        StepListener<Releasable> connectionListener = new StepListener<>();
-        StepListener<R> fetchRemoteResultListener = new StepListener<>();
+        ListenableFuture<Releasable> connectionListener = new ListenableFuture<>();
+        ListenableFuture<R> fetchRemoteResultListener = new ListenableFuture<>();
         long startTimeMillis = transportService.getThreadPool().relativeTimeInMillis();
-        connectionListener.whenComplete(releasable -> {
+        connectionListener.addListener(ActionListener.wrap(releasable -> {
             if (masterEligibleNode == null) {
                 Releasables.close(releasable);
                 responseConsumer.accept(null);
@@ -1119,9 +1119,9 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
         }, e -> {
             logger.warn("Exception connecting to master " + masterEligibleNode, e);
             responseConsumer.accept(responseTransformationFunction.apply(null, e));
-        });
+        }));
 
-        fetchRemoteResultListener.whenComplete(response -> {
+        fetchRemoteResultListener.addListener(ActionListener.wrap(response -> {
             long endTimeMillis = transportService.getThreadPool().relativeTimeInMillis();
             logger.trace(
                 "Received remote response from {} in {}",
@@ -1132,7 +1132,7 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
         }, e -> {
             logger.warn("Exception in remote request to master" + masterEligibleNode, e);
             responseConsumer.accept(responseTransformationFunction.apply(null, e));
-        });
+        }));
 
         return transportService.getThreadPool().schedule(new Runnable() {
             @Override

+ 5 - 5
server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

@@ -22,7 +22,6 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -33,6 +32,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.engine.Engine;
@@ -545,7 +545,7 @@ public final class StoreRecovery {
                 } else {
                     snapshotShardId = new ShardId(indexId.getName(), IndexMetadata.INDEX_UUID_NA_VALUE, shardId.id());
                 }
-                final StepListener<IndexId> indexIdListener = new StepListener<>();
+                final ListenableFuture<IndexId> indexIdListener = new ListenableFuture<>();
                 // If the index UUID was not found in the recovery source we will have to load RepositoryData and resolve it by index name
                 if (indexId.getId().equals(IndexMetadata.INDEX_UUID_NA_VALUE)) {
                     // BwC path, running against an old version master that did not add the IndexId to the recovery source
@@ -559,7 +559,7 @@ public final class StoreRecovery {
                     indexIdListener.onResponse(indexId);
                 }
                 assert indexShard.getEngineOrNull() == null;
-                indexIdListener.whenComplete(idx -> {
+                indexIdListener.addListener(restoreListener.delegateFailureAndWrap((l, idx) -> {
                     assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.GENERIC, ThreadPool.Names.SNAPSHOT);
                     repository.restoreShard(
                         indexShard.store(),
@@ -567,9 +567,9 @@ public final class StoreRecovery {
                         idx,
                         snapshotShardId,
                         indexShard.recoveryState(),
-                        restoreListener
+                        l
                     );
-                }, restoreListener::onFailure);
+                }));
             } catch (Exception e) {
                 restoreListener.onFailure(e);
             }

+ 89 - 83
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -20,7 +20,6 @@ import org.apache.lucene.util.ArrayUtil;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.ListenableActionFuture;
 import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.action.support.ThreadedActionListener;
@@ -232,10 +231,10 @@ public class RecoverySourceHandler {
             logger.trace("history is retained by retention lock");
         }
 
-        final StepListener<SendFileResult> sendFileStep = new StepListener<>();
-        final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
-        final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
-        final StepListener<Void> finalizeStep = new StepListener<>();
+        final ListenableFuture<SendFileResult> sendFileStep = new ListenableFuture<>();
+        final ListenableFuture<TimeValue> prepareEngineStep = new ListenableFuture<>();
+        final ListenableFuture<SendSnapshotResult> sendSnapshotStep = new ListenableFuture<>();
+        final ListenableFuture<Void> finalizeStep = new ListenableFuture<>();
 
         if (isSequenceNumberBasedRecovery) {
             logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
@@ -271,13 +270,13 @@ public class RecoverySourceHandler {
                 final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo);
                 final Releasable releaseStore = acquireStore(shard.store());
                 resources.add(releaseStore);
-                sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
+                sendFileStep.addListener(ActionListener.wrap(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
                     try {
                         IOUtils.close(safeCommitRef, releaseStore);
                     } catch (Exception ex) {
                         logger.warn("releasing snapshot caused exception", ex);
                     }
-                });
+                }));
 
                 // If the target previously had a copy of this shard then a file-based recovery might move its global checkpoint
                 // backwards. We must therefore remove any existing retention lease so that we can create a new one later on in the
@@ -293,13 +292,13 @@ public class RecoverySourceHandler {
         }
         assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
 
-        sendFileStep.whenComplete(r -> {
+        sendFileStep.addListener(ActionListener.wrap(r -> {
             assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
             // For a sequence based recovery, the target can keep its local translog
             prepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
-        }, onFailure);
+        }, onFailure));
 
-        prepareEngineStep.whenComplete(prepareEngineTime -> {
+        prepareEngineStep.addListener(ActionListener.wrap(prepareEngineTime -> {
             assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]");
             /*
              * add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
@@ -343,13 +342,15 @@ public class RecoverySourceHandler {
                     );
                 }, onFailure)
             );
-        }, onFailure);
+        }, onFailure));
 
         // Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2
         final long trimAboveSeqNo = startingSeqNo - 1;
-        sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure);
+        sendSnapshotStep.addListener(
+            ActionListener.wrap(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure)
+        );
 
-        finalizeStep.whenComplete(r -> {
+        finalizeStep.addListener(ActionListener.wrap(r -> {
             final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
             final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
             final SendFileResult sendFileResult = sendFileStep.result();
@@ -371,7 +372,7 @@ public class RecoverySourceHandler {
             } finally {
                 IOUtils.close(resources);
             }
-        }, onFailure);
+        }, onFailure));
     }
 
     private boolean isTargetSameHistory() {
@@ -552,12 +553,12 @@ public class RecoverySourceHandler {
                 logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId());
 
                 // but we must still create a retention lease
-                final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
+                final ListenableFuture<RetentionLease> createRetentionLeaseStep = new ListenableFuture<>();
                 createRetentionLease(startingSeqNo, createRetentionLeaseStep);
-                createRetentionLeaseStep.whenComplete(retentionLease -> {
+                createRetentionLeaseStep.addListener(listener.delegateFailureAndWrap((l, retentionLease) -> {
                     final TimeValue took = stopWatch.totalTime();
                     logger.trace("recovery [phase1]: took [{}]", took);
-                    listener.onResponse(
+                    l.onResponse(
                         new SendFileResult(
                             Collections.emptyList(),
                             Collections.emptyList(),
@@ -568,7 +569,7 @@ public class RecoverySourceHandler {
                             took
                         )
                     );
-                }, listener::onFailure);
+                }));
 
             }
         } catch (Exception e) {
@@ -660,11 +661,11 @@ public class RecoverySourceHandler {
         // since the plan can change after a failure recovering files from the snapshots that cannot be
         // recovered from the source node, in that case we have to start from scratch using the fallback
         // recovery plan that would be used in subsequent steps.
-        final StepListener<Void> sendFileInfoStep = new StepListener<>();
-        final StepListener<Tuple<ShardRecoveryPlan, List<StoreFileMetadata>>> recoverSnapshotFilesStep = new StepListener<>();
-        final StepListener<ShardRecoveryPlan> sendFilesStep = new StepListener<>();
-        final StepListener<Tuple<ShardRecoveryPlan, RetentionLease>> createRetentionLeaseStep = new StepListener<>();
-        final StepListener<ShardRecoveryPlan> cleanFilesStep = new StepListener<>();
+        final ListenableFuture<Void> sendFileInfoStep = new ListenableFuture<>();
+        final ListenableFuture<Tuple<ShardRecoveryPlan, List<StoreFileMetadata>>> recoverSnapshotFilesStep = new ListenableFuture<>();
+        final ListenableFuture<ShardRecoveryPlan> sendFilesStep = new ListenableFuture<>();
+        final ListenableFuture<Tuple<ShardRecoveryPlan, RetentionLease>> createRetentionLeaseStep = new ListenableFuture<>();
+        final ListenableFuture<ShardRecoveryPlan> cleanFilesStep = new ListenableFuture<>();
 
         final int translogOps = shardRecoveryPlan.getTranslogOps();
         recoveryTarget.receiveFileInfo(
@@ -676,32 +677,34 @@ public class RecoverySourceHandler {
             sendFileInfoStep
         );
 
-        sendFileInfoStep.whenComplete(unused -> recoverSnapshotFiles(shardRecoveryPlan, new ActionListener<>() {
-            @Override
-            public void onResponse(List<StoreFileMetadata> filesFailedToRecoverFromSnapshot) {
-                recoverSnapshotFilesStep.onResponse(Tuple.tuple(shardRecoveryPlan, filesFailedToRecoverFromSnapshot));
-            }
+        sendFileInfoStep.addListener(
+            listener.delegateFailureAndWrap((l, unused) -> recoverSnapshotFiles(shardRecoveryPlan, new ActionListener<>() {
+                @Override
+                public void onResponse(List<StoreFileMetadata> filesFailedToRecoverFromSnapshot) {
+                    recoverSnapshotFilesStep.onResponse(Tuple.tuple(shardRecoveryPlan, filesFailedToRecoverFromSnapshot));
+                }
 
-            @Override
-            public void onFailure(Exception e) {
-                if (shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() == false
-                    && e instanceof CancellableThreads.ExecutionCancelledException == false) {
-                    ShardRecoveryPlan fallbackPlan = shardRecoveryPlan.getFallbackPlan();
-                    recoveryTarget.receiveFileInfo(
-                        fallbackPlan.getFilesToRecoverNames(),
-                        fallbackPlan.getFilesToRecoverSizes(),
-                        fallbackPlan.getFilesPresentInTargetNames(),
-                        fallbackPlan.getFilesPresentInTargetSizes(),
-                        fallbackPlan.getTranslogOps(),
-                        recoverSnapshotFilesStep.map(r -> Tuple.tuple(fallbackPlan, Collections.emptyList()))
-                    );
-                } else {
-                    recoverSnapshotFilesStep.onFailure(e);
+                @Override
+                public void onFailure(Exception e) {
+                    if (shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() == false
+                        && e instanceof CancellableThreads.ExecutionCancelledException == false) {
+                        ShardRecoveryPlan fallbackPlan = shardRecoveryPlan.getFallbackPlan();
+                        recoveryTarget.receiveFileInfo(
+                            fallbackPlan.getFilesToRecoverNames(),
+                            fallbackPlan.getFilesToRecoverSizes(),
+                            fallbackPlan.getFilesPresentInTargetNames(),
+                            fallbackPlan.getFilesPresentInTargetSizes(),
+                            fallbackPlan.getTranslogOps(),
+                            recoverSnapshotFilesStep.map(r -> Tuple.tuple(fallbackPlan, Collections.emptyList()))
+                        );
+                    } else {
+                        recoverSnapshotFilesStep.onFailure(e);
+                    }
                 }
-            }
-        }), listener::onFailure);
+            }))
+        );
 
-        recoverSnapshotFilesStep.whenComplete(planAndFilesFailedToRecoverFromSnapshot -> {
+        recoverSnapshotFilesStep.addListener(listener.delegateFailureAndWrap((l, planAndFilesFailedToRecoverFromSnapshot) -> {
             ShardRecoveryPlan recoveryPlan = planAndFilesFailedToRecoverFromSnapshot.v1();
             List<StoreFileMetadata> filesFailedToRecoverFromSnapshot = planAndFilesFailedToRecoverFromSnapshot.v2();
             final List<StoreFileMetadata> filesToRecoverFromSource;
@@ -717,17 +720,18 @@ public class RecoverySourceHandler {
                 recoveryPlan::getTranslogOps,
                 sendFilesStep.map(unused -> recoveryPlan)
             );
-        }, listener::onFailure);
+        }));
 
-        sendFilesStep.whenComplete(
-            recoveryPlan -> createRetentionLease(
-                recoveryPlan.getStartingSeqNo(),
-                createRetentionLeaseStep.map(retentionLease -> Tuple.tuple(recoveryPlan, retentionLease))
-            ),
-            listener::onFailure
+        sendFilesStep.addListener(
+            listener.delegateFailureAndWrap(
+                (l, recoveryPlan) -> createRetentionLease(
+                    recoveryPlan.getStartingSeqNo(),
+                    createRetentionLeaseStep.map(retentionLease -> Tuple.tuple(recoveryPlan, retentionLease))
+                )
+            )
         );
 
-        createRetentionLeaseStep.whenComplete(recoveryPlanAndRetentionLease -> {
+        createRetentionLeaseStep.addListener(listener.delegateFailureAndWrap((l, recoveryPlanAndRetentionLease) -> {
             final ShardRecoveryPlan recoveryPlan = recoveryPlanAndRetentionLease.v1();
             final RetentionLease retentionLease = recoveryPlanAndRetentionLease.v2();
             final Store.MetadataSnapshot recoverySourceMetadata = recoveryPlan.getSourceMetadataSnapshot();
@@ -745,12 +749,12 @@ public class RecoverySourceHandler {
                 lastKnownGlobalCheckpoint,
                 cleanFilesStep.map(unused -> recoveryPlan)
             );
-        }, listener::onFailure);
+        }));
 
-        cleanFilesStep.whenComplete(recoveryPlan -> {
+        cleanFilesStep.addListener(listener.delegateFailureAndWrap((l, recoveryPlan) -> {
             final TimeValue took = stopWatch.totalTime();
             logger.trace("recovery [phase1]: took [{}]", took);
-            listener.onResponse(
+            l.onResponse(
                 new SendFileResult(
                     recoveryPlan.getFilesToRecoverNames(),
                     recoveryPlan.getFilesToRecoverSizes(),
@@ -761,7 +765,7 @@ public class RecoverySourceHandler {
                     took
                 )
             );
-        }, listener::onFailure);
+        }));
     }
 
     /**
@@ -1107,7 +1111,7 @@ public class RecoverySourceHandler {
         }
         logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");
         final StopWatch stopWatch = new StopWatch().start();
-        final StepListener<Void> sendListener = new StepListener<>();
+        final ListenableFuture<Void> sendListener = new ListenableFuture<>();
         final OperationBatchSender sender = new OperationBatchSender(
             startingSeqNo,
             endingSeqNo,
@@ -1118,7 +1122,7 @@ public class RecoverySourceHandler {
             mappingVersion,
             sendListener
         );
-        sendListener.whenComplete(ignored -> {
+        sendListener.addListener(listener.delegateFailureAndWrap((delegate, ignored) -> {
             final long skippedOps = sender.skippedOps.get();
             final int totalSentOps = sender.sentOps.get();
             final long targetLocalCheckpoint = sender.targetLocalCheckpoint.get();
@@ -1134,8 +1138,8 @@ public class RecoverySourceHandler {
             stopWatch.stop();
             final TimeValue tookTime = stopWatch.totalTime();
             logger.trace("recovery [phase2]: took [{}]", tookTime);
-            listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime));
-        }, listener::onFailure);
+            delegate.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime));
+        }));
         sender.start();
     }
 
@@ -1248,7 +1252,7 @@ public class RecoverySourceHandler {
          * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire
          * the permit then the state of the shard will be relocated and this recovery will fail.
          */
-        final StepListener<Void> markInSyncStep = new StepListener<>();
+        final ListenableFuture<Void> markInSyncStep = new ListenableFuture<>();
         runUnderPrimaryPermit(
             () -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
             shard,
@@ -1256,27 +1260,29 @@ public class RecoverySourceHandler {
             markInSyncStep
         );
 
-        final StepListener<Long> finalizeListener = new StepListener<>();
-        markInSyncStep.whenComplete(ignored -> {
+        final ListenableFuture<Long> finalizeListener = new ListenableFuture<>();
+        markInSyncStep.addListener(listener.delegateFailureAndWrap((l, ignored) -> {
             final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery
             cancellableThreads.checkForCancel();
             recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener.map(ignored2 -> globalCheckpoint));
-        }, listener::onFailure);
-
-        final StepListener<Void> updateGlobalCheckpointStep = new StepListener<>();
-        finalizeListener.whenComplete(globalCheckpoint -> {
-            runUnderPrimaryPermit(
-                () -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
-                shard,
-                cancellableThreads,
-                updateGlobalCheckpointStep
-            );
-        }, listener::onFailure);
+        }));
+
+        final ListenableFuture<Void> updateGlobalCheckpointStep = new ListenableFuture<>();
+        finalizeListener.addListener(
+            listener.delegateFailureAndWrap(
+                (l, globalCheckpoint) -> runUnderPrimaryPermit(
+                    () -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
+                    shard,
+                    cancellableThreads,
+                    updateGlobalCheckpointStep
+                )
+            )
+        );
 
-        final StepListener<Void> finalStep;
+        final ListenableFuture<Void> finalStep;
         if (request.isPrimaryRelocation()) {
-            finalStep = new StepListener<>();
-            updateGlobalCheckpointStep.whenComplete(ignored -> {
+            finalStep = new ListenableFuture<>();
+            updateGlobalCheckpointStep.addListener(listener.delegateFailureAndWrap((l, ignored) -> {
                 logger.trace("performing relocation hand-off");
                 cancellableThreads.execute(
                     // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
@@ -1286,15 +1292,15 @@ public class RecoverySourceHandler {
                  * if the recovery process fails after disabling primary mode on the source shard, both relocation source and
                  * target are failed (see {@link IndexShard#updateRoutingEntry}).
                  */
-            }, listener::onFailure);
+            }));
         } else {
             finalStep = updateGlobalCheckpointStep;
         }
 
-        finalStep.whenComplete(ignored -> {
+        finalStep.addListener(listener.delegateFailureAndWrap((l, ignored) -> {
             cancellableThreads.checkForCancel();
-            completeFinalizationListener(listener, stopWatch);
-        }, listener::onFailure);
+            completeFinalizationListener(l, stopWatch);
+        }));
     }
 
     private void completeFinalizationListener(ActionListener<Void> listener, StopWatch stopWatch) {

+ 15 - 15
server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java

@@ -11,8 +11,8 @@ package org.elasticsearch.repositories;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots;
@@ -49,11 +49,11 @@ public class IndexSnapshotsService {
     ) {
         assert repositoryName != null;
 
-        final ActionListener<Optional<ShardSnapshotInfo>> listener = originalListener.delegateResponse((delegate, err) -> {
-            delegate.onFailure(
+        final ActionListener<Optional<ShardSnapshotInfo>> listener = originalListener.delegateResponse(
+            (delegate, err) -> delegate.onFailure(
                 new RepositoryException(repositoryName, "Unable to find the latest snapshot for shard [" + shardId + "]", err)
-            );
-        });
+            )
+        );
 
         final Repository repository = getRepository(repositoryName);
         if (repository == null) {
@@ -62,13 +62,13 @@ public class IndexSnapshotsService {
         }
 
         final String indexName = shardId.getIndexName();
-        StepListener<RepositoryData> repositoryDataStepListener = new StepListener<>();
-        StepListener<FetchShardSnapshotContext> snapshotInfoStepListener = new StepListener<>();
+        ListenableFuture<RepositoryData> repositoryDataStepListener = new ListenableFuture<>();
+        ListenableFuture<FetchShardSnapshotContext> snapshotInfoStepListener = new ListenableFuture<>();
 
-        repositoryDataStepListener.whenComplete(repositoryData -> {
+        repositoryDataStepListener.addListener(listener.delegateFailureAndWrap((delegate, repositoryData) -> {
             if (repositoryData.hasIndex(indexName) == false) {
                 logger.debug("{} repository [{}] has no snapshots of this index", shardId, repositoryName);
-                listener.onResponse(Optional.empty());
+                delegate.onResponse(Optional.empty());
                 return;
             }
 
@@ -88,7 +88,7 @@ public class IndexSnapshotsService {
                 // a valid candidate, but for simplicity we just consider that we couldn't find any valid snapshot. Existing
                 // snapshots start/end timestamps should appear in the RepositoryData eventually.
                 logger.debug("{} could not determine latest snapshot of this shard in repository [{}]", shardId, repositoryName);
-                listener.onResponse(Optional.empty());
+                delegate.onResponse(Optional.empty());
                 return;
             }
 
@@ -100,16 +100,16 @@ public class IndexSnapshotsService {
                     snapshotInfo -> new FetchShardSnapshotContext(repository, repositoryData, indexId, shardId, snapshotInfo)
                 )
             );
-        }, listener::onFailure);
+        }));
 
-        snapshotInfoStepListener.whenComplete(fetchSnapshotContext -> {
+        snapshotInfoStepListener.addListener(listener.delegateFailureAndWrap((delegate, fetchSnapshotContext) -> {
             assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT_META);
             final SnapshotInfo snapshotInfo = fetchSnapshotContext.getSnapshotInfo();
 
             if (snapshotInfo == null || snapshotInfo.state() != SnapshotState.SUCCESS) {
                 // We couldn't find a valid candidate
                 logger.debug("{} failed to retrieve snapshot details from [{}]", shardId, repositoryName);
-                listener.onResponse(Optional.empty());
+                delegate.onResponse(Optional.empty());
                 return;
             }
 
@@ -124,8 +124,8 @@ public class IndexSnapshotsService {
                 .findFirst()
                 .map(snapshotFiles -> fetchSnapshotContext.createIndexShardSnapshotInfo(indexMetadataId, snapshotFiles));
 
-            listener.onResponse(indexShardSnapshotInfo);
-        }, listener::onFailure);
+            delegate.onResponse(indexShardSnapshotInfo);
+        }));
 
         repository.getRepositoryData(repositoryDataStepListener);
     }

+ 36 - 23
server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

@@ -13,7 +13,6 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -39,6 +38,7 @@ import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.core.TimeValue;
@@ -151,41 +151,54 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
             return;
         }
 
-        final StepListener<AcknowledgedResponse> acknowledgementStep = new StepListener<>();
-        final StepListener<Boolean> publicationStep = new StepListener<>(); // Boolean==changed.
+        final ListenableFuture<AcknowledgedResponse> acknowledgementStep = new ListenableFuture<>();
+        final ListenableFuture<Boolean> publicationStep = new ListenableFuture<>(); // Boolean==changed.
 
         if (request.verify()) {
 
             // When publication has completed (and all acks received or timed out) then verify the repository.
             // (if acks timed out then acknowledgementStep completes before the master processes this cluster state, hence why we have
             // to wait for the publication to be complete too)
-            final StepListener<List<DiscoveryNode>> verifyStep = new StepListener<>();
-            publicationStep.whenComplete(changed -> acknowledgementStep.whenComplete(clusterStateUpdateResponse -> {
-                if (clusterStateUpdateResponse.isAcknowledged() && changed) {
-                    // The response was acknowledged - all nodes should know about the new repository, let's verify them
-                    verifyRepository(request.name(), verifyStep);
-                } else {
-                    verifyStep.onResponse(null);
-                }
-            }, listener::onFailure), listener::onFailure);
+            final ListenableFuture<List<DiscoveryNode>> verifyStep = new ListenableFuture<>();
+            publicationStep.addListener(
+                listener.delegateFailureAndWrap(
+                    (delegate, changed) -> acknowledgementStep.addListener(
+                        delegate.delegateFailureAndWrap((l, clusterStateUpdateResponse) -> {
+                            if (clusterStateUpdateResponse.isAcknowledged() && changed) {
+                                // The response was acknowledged - all nodes should know about the new repository, let's verify them
+                                verifyRepository(request.name(), verifyStep);
+                            } else {
+                                verifyStep.onResponse(null);
+                            }
+                        })
+                    )
+                )
+            );
 
             // When verification has completed, get the repository data for the first time
-            final StepListener<RepositoryData> getRepositoryDataStep = new StepListener<>();
-            verifyStep.whenComplete(
-                ignored -> threadPool.generic()
-                    .execute(ActionRunnable.wrap(getRepositoryDataStep, l -> repository(request.name()).getRepositoryData(l))),
-                listener::onFailure
+            final ListenableFuture<RepositoryData> getRepositoryDataStep = new ListenableFuture<>();
+            verifyStep.addListener(
+                listener.delegateFailureAndWrap(
+                    (l, ignored) -> threadPool.generic()
+                        .execute(ActionRunnable.wrap(getRepositoryDataStep, ll -> repository(request.name()).getRepositoryData(ll)))
+                )
             );
 
             // When the repository metadata is ready, update the repository UUID stored in the cluster state, if available
-            final StepListener<Void> updateRepoUuidStep = new StepListener<>();
-            getRepositoryDataStep.whenComplete(
-                repositoryData -> updateRepositoryUuidInMetadata(clusterService, request.name(), repositoryData, updateRepoUuidStep),
-                listener::onFailure
+            final ListenableFuture<Void> updateRepoUuidStep = new ListenableFuture<>();
+            getRepositoryDataStep.addListener(
+                listener.delegateFailureAndWrap(
+                    (l, repositoryData) -> updateRepositoryUuidInMetadata(
+                        clusterService,
+                        request.name(),
+                        repositoryData,
+                        updateRepoUuidStep
+                    )
+                )
             );
 
             // Finally respond to the outer listener with the response from the original cluster state update
-            updateRepoUuidStep.whenComplete(ignored -> acknowledgementStep.addListener(listener), listener::onFailure);
+            updateRepoUuidStep.addListener(listener.delegateFailureAndWrap((l, ignored) -> acknowledgementStep.addListener(l)));
 
         } else {
             acknowledgementStep.addListener(listener);
@@ -233,7 +246,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
         RegisterRepositoryTask(
             final RepositoriesService repositoriesService,
             final PutRepositoryRequest request,
-            final StepListener<AcknowledgedResponse> acknowledgementStep
+            final ListenableFuture<AcknowledgedResponse> acknowledgementStep
         ) {
             super(request, acknowledgementStep);
             this.repositoriesService = repositoriesService;

+ 74 - 70
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -26,7 +26,6 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.SingleResultDeduplicator;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.ListenableActionFuture;
 import org.elasticsearch.action.support.PlainActionFuture;
@@ -70,6 +69,7 @@ import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.xcontent.ChunkedToXContent;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.core.CheckedConsumer;
@@ -884,7 +884,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     ) {
         if (SnapshotsService.useShardGenerations(repoMetaVersion)) {
             // First write the new shard state metadata (with the removed snapshot) and compute deletion targets
-            final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep = new StepListener<>();
+            final ListenableFuture<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep =
+                new ListenableFuture<>();
             writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep);
             // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows:
             // 1. Remove the snapshots from the list of existing snapshots
@@ -893,8 +894,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created
             // index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only
             // written if all shard paths have been successfully updated.
-            final StepListener<RepositoryData> writeUpdatedRepoDataStep = new StepListener<>();
-            writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> {
+            final ListenableFuture<RepositoryData> writeUpdatedRepoDataStep = new ListenableFuture<>();
+            writeShardMetaDataAndComputeDeletesStep.addListener(ActionListener.wrap(deleteResults -> {
                 final ShardGenerations.Builder builder = ShardGenerations.builder();
                 for (ShardSnapshotMetaDeleteResult newGen : deleteResults) {
                     builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
@@ -907,9 +908,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     Function.identity(),
                     ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure)
                 );
-            }, listener::onFailure);
+            }, listener::onFailure));
             // Once we have updated the repository, run the clean-ups
-            writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> {
+            writeUpdatedRepoDataStep.addListener(ActionListener.wrap(updatedRepoData -> {
                 listener.onRepositoryDataWritten(updatedRepoData);
                 // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
                 try (var refs = new RefCountingRunnable(listener::onDone)) {
@@ -921,7 +922,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                         refs.acquireListener()
                     );
                 }
-            }, listener::onFailure);
+            }, listener::onFailure));
         } else {
             // Write the new repository data first (with the removed snapshot), using no shard generations
             final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY);
@@ -1005,7 +1006,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             final Set<SnapshotId> survivingSnapshots = snapshotsWithIndex.stream()
                 .filter(id -> snapshotIds.contains(id) == false)
                 .collect(Collectors.toSet());
-            final StepListener<Collection<Integer>> shardCountListener = new StepListener<>();
+            final ListenableFuture<Collection<Integer>> shardCountListener = new ListenableFuture<>();
             final Collection<String> indexMetaGenerations = snapshotIds.stream()
                 .filter(snapshotsWithIndex::contains)
                 .map(id -> oldRepositoryData.indexMetaDataGenerations().indexMetaBlobId(id, indexId))
@@ -1033,17 +1034,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     }
                 }));
             }
-            shardCountListener.whenComplete(counts -> {
+            shardCountListener.addListener(deleteIndexMetadataListener.delegateFailureAndWrap((delegate, counts) -> {
                 final int shardCount = counts.stream().mapToInt(i -> i).max().orElse(0);
                 if (shardCount == 0) {
-                    deleteIndexMetadataListener.onResponse(null);
+                    delegate.onResponse(null);
                     return;
                 }
                 // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
-                final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener = new GroupedActionListener<>(
-                    shardCount,
-                    deleteIndexMetadataListener
-                );
+                final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener = new GroupedActionListener<>(shardCount, delegate);
                 for (int shardId = 0; shardId < shardCount; shardId++) {
                     final int finalShardId = shardId;
                     executor.execute(new AbstractRunnable() {
@@ -1096,7 +1094,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                         }
                     });
                 }
-            }, deleteIndexMetadataListener::onFailure);
+            }));
         }
     }
 
@@ -1342,9 +1340,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
         final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion);
 
-        final StepListener<RepositoryData> repoDataListener = new StepListener<>();
+        final ListenableFuture<RepositoryData> repoDataListener = new ListenableFuture<>();
         getRepositoryData(repoDataListener);
-        repoDataListener.whenComplete(existingRepositoryData -> {
+        repoDataListener.addListener(ActionListener.wrap(existingRepositoryData -> {
             final int existingSnapshotCount = existingRepositoryData.getSnapshotIds().size();
             if (existingSnapshotCount >= maxSnapshotCount) {
                 finalizeSnapshotContext.onFailure(
@@ -1441,7 +1439,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     )
                 );
             }
-        }, onUpdateFailure);
+        }, onUpdateFailure));
     }
 
     // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
@@ -2182,7 +2180,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         }
 
         // Step 1: Set repository generation state to the next possible pending generation
-        final StepListener<Long> setPendingStep = new StepListener<>();
+        final ListenableFuture<Long> setPendingStep = new ListenableFuture<>();
         final String setPendingGenerationSource = "set pending repository generation [" + metadata.name() + "][" + expectedGen + "]";
         submitUnbatchedTask(setPendingGenerationSource, new ClusterStateUpdateTask() {
 
@@ -2254,53 +2252,59 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             }
         });
 
-        final StepListener<RepositoryData> filterRepositoryDataStep = new StepListener<>();
+        final ListenableFuture<RepositoryData> filterRepositoryDataStep = new ListenableFuture<>();
 
         // Step 2: Write new index-N blob to repository and update index.latest
-        setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
-            // BwC logic: Load snapshot version information if any snapshot is missing details in RepositoryData so that the new
-            // RepositoryData contains full details for every snapshot
-            final List<SnapshotId> snapshotIdsWithMissingDetails = repositoryData.getSnapshotIds()
-                .stream()
-                .filter(repositoryData::hasMissingDetails)
-                .toList();
-            if (snapshotIdsWithMissingDetails.isEmpty() == false) {
-                final Map<SnapshotId, SnapshotDetails> extraDetailsMap = new ConcurrentHashMap<>();
-                getSnapshotInfo(new GetSnapshotInfoContext(snapshotIdsWithMissingDetails, false, () -> false, (context, snapshotInfo) -> {
-                    final String slmPolicy = slmPolicy(snapshotInfo);
-                    extraDetailsMap.put(
-                        snapshotInfo.snapshotId(),
-                        new SnapshotDetails(
-                            snapshotInfo.state(),
-                            snapshotInfo.version(),
-                            snapshotInfo.startTime(),
-                            snapshotInfo.endTime(),
-                            slmPolicy
-                        )
-                    );
-                }, ActionListener.runAfter(new ActionListener<>() {
-                    @Override
-                    public void onResponse(Void aVoid) {
-                        logger.info(
-                            "Successfully loaded all snapshots' detailed information for {} from snapshot metadata",
-                            AllocationService.firstListElementsToCommaDelimitedString(
-                                snapshotIdsWithMissingDetails,
-                                SnapshotId::toString,
-                                logger.isDebugEnabled()
-                            )
-                        );
-                    }
+        setPendingStep.addListener(
+            listener.delegateFailureAndWrap(
+                (delegate, newGen) -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(delegate, l -> {
+                    // BwC logic: Load snapshot version information if any snapshot is missing details in RepositoryData so that the new
+                    // RepositoryData contains full details for every snapshot
+                    final List<SnapshotId> snapshotIdsWithMissingDetails = repositoryData.getSnapshotIds()
+                        .stream()
+                        .filter(repositoryData::hasMissingDetails)
+                        .toList();
+                    if (snapshotIdsWithMissingDetails.isEmpty() == false) {
+                        final Map<SnapshotId, SnapshotDetails> extraDetailsMap = new ConcurrentHashMap<>();
+                        getSnapshotInfo(
+                            new GetSnapshotInfoContext(snapshotIdsWithMissingDetails, false, () -> false, (context, snapshotInfo) -> {
+                                final String slmPolicy = slmPolicy(snapshotInfo);
+                                extraDetailsMap.put(
+                                    snapshotInfo.snapshotId(),
+                                    new SnapshotDetails(
+                                        snapshotInfo.state(),
+                                        snapshotInfo.version(),
+                                        snapshotInfo.startTime(),
+                                        snapshotInfo.endTime(),
+                                        slmPolicy
+                                    )
+                                );
+                            }, ActionListener.runAfter(new ActionListener<>() {
+                                @Override
+                                public void onResponse(Void aVoid) {
+                                    logger.info(
+                                        "Successfully loaded all snapshots' detailed information for {} from snapshot metadata",
+                                        AllocationService.firstListElementsToCommaDelimitedString(
+                                            snapshotIdsWithMissingDetails,
+                                            SnapshotId::toString,
+                                            logger.isDebugEnabled()
+                                        )
+                                    );
+                                }
 
-                    @Override
-                    public void onFailure(Exception e) {
-                        logger.warn("Failure when trying to load missing details from snapshot metadata", e);
+                                @Override
+                                public void onFailure(Exception e) {
+                                    logger.warn("Failure when trying to load missing details from snapshot metadata", e);
+                                }
+                            }, () -> filterRepositoryDataStep.onResponse(repositoryData.withExtraDetails(extraDetailsMap))))
+                        );
+                    } else {
+                        filterRepositoryDataStep.onResponse(repositoryData);
                     }
-                }, () -> filterRepositoryDataStep.onResponse(repositoryData.withExtraDetails(extraDetailsMap)))));
-            } else {
-                filterRepositoryDataStep.onResponse(repositoryData);
-            }
-        })), listener::onFailure);
-        filterRepositoryDataStep.whenComplete(filteredRepositoryData -> {
+                }))
+            )
+        );
+        filterRepositoryDataStep.addListener(listener.delegateFailureAndWrap((delegate, filteredRepositoryData) -> {
             final long newGen = setPendingStep.result();
             final RepositoryData newRepositoryData = updateRepositoryData(filteredRepositoryData, version, newGen);
             if (latestKnownRepoGen.get() >= newGen) {
@@ -2313,7 +2317,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 );
             }
             // write the index file
-            if (ensureSafeGenerationExists(expectedGen, listener::onFailure) == false) {
+            if (ensureSafeGenerationExists(expectedGen, delegate::onFailure) == false) {
                 return;
             }
             final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
@@ -2360,7 +2364,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
                 @Override
                 public void onFailure(Exception e) {
-                    listener.onFailure(
+                    delegate.onFailure(
                         new RepositoryException(
                             metadata.name(),
                             "Failed to execute cluster state update [" + setSafeGenerationSource + "]",
@@ -2373,7 +2377,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
                     logger.trace("[{}] successfully set safe repository generation to [{}]", metadata.name(), newGen);
                     cacheRepositoryData(newRepositoryData, version);
-                    threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
+                    threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(delegate, () -> {
                         // Delete all now outdated index files up to 1000 blobs back from the new generation.
                         // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
                         // Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep
@@ -2392,7 +2396,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     }));
                 }
             });
-        }, listener::onFailure);
+        }));
     }
 
     /**
@@ -2863,8 +2867,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 };
             }
 
-            final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
-            allFilesUploadedListener.whenComplete(v -> {
+            final ListenableFuture<Collection<Void>> allFilesUploadedListener = new ListenableFuture<>();
+            allFilesUploadedListener.addListener(context.delegateFailureAndWrap((delegate, v) -> {
                 final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize();
 
                 // now create and write the commit point
@@ -2900,8 +2904,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles())
                 );
                 snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
-                context.onResponse(shardSnapshotResult);
-            }, context::onFailure);
+                delegate.onResponse(shardSnapshotResult);
+            }));
             if (indexIncrementalFileCount == 0 || filesToSnapshot.isEmpty()) {
                 allFilesUploadedListener.onResponse(Collections.emptyList());
                 return;

+ 14 - 12
server/src/main/java/org/elasticsearch/rest/action/cat/RestTemplatesAction.java

@@ -11,7 +11,6 @@ package org.elasticsearch.rest.action.cat;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest;
 import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
@@ -19,6 +18,7 @@ import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.common.Table;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.Scope;
@@ -69,10 +69,10 @@ public class RestTemplatesAction extends AbstractCatAction {
 
         return channel -> {
 
-            final StepListener<GetIndexTemplatesResponse> getIndexTemplatesStep = new StepListener<>();
+            final ListenableFuture<GetIndexTemplatesResponse> getIndexTemplatesStep = new ListenableFuture<>();
             client.admin().indices().getTemplates(getIndexTemplatesRequest, getIndexTemplatesStep);
 
-            final StepListener<GetComposableIndexTemplateAction.Response> getComposableTemplatesStep = new StepListener<>();
+            final ListenableFuture<GetComposableIndexTemplateAction.Response> getComposableTemplatesStep = new ListenableFuture<>();
             client.execute(
                 GetComposableIndexTemplateAction.INSTANCE,
                 getComposableTemplatesRequest,
@@ -92,15 +92,17 @@ public class RestTemplatesAction extends AbstractCatAction {
                 }
             };
 
-            getIndexTemplatesStep.whenComplete(
-                getIndexTemplatesResponse -> getComposableTemplatesStep.whenComplete(
-                    getComposableIndexTemplatesResponse -> ActionListener.completeWith(
-                        tableListener,
-                        () -> buildTable(request, getIndexTemplatesResponse, getComposableIndexTemplatesResponse)
-                    ),
-                    tableListener::onFailure
-                ),
-                tableListener::onFailure
+            getIndexTemplatesStep.addListener(
+                tableListener.delegateFailureAndWrap(
+                    (l, getIndexTemplatesResponse) -> getComposableTemplatesStep.addListener(
+                        l.delegateFailureAndWrap(
+                            (ll, getComposableIndexTemplatesResponse) -> ActionListener.completeWith(
+                                ll,
+                                () -> buildTable(request, getIndexTemplatesResponse, getComposableIndexTemplatesResponse)
+                            )
+                        )
+                    )
+                )
             );
         };
     }

+ 36 - 26
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -12,7 +12,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.RefCountingRunnable;
@@ -57,6 +56,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.Nullable;
@@ -249,40 +249,50 @@ public class RestoreService implements ClusterStateApplier {
     ) {
         try {
             // Try and fill in any missing repository UUIDs in case they're needed during the restore
-            final var repositoryUuidRefreshStep = new StepListener<Void>();
+            final var repositoryUuidRefreshStep = new ListenableFuture<Void>();
             refreshRepositoryUuids(refreshRepositoryUuidOnRestore, repositoriesService, () -> repositoryUuidRefreshStep.onResponse(null));
 
             // Read snapshot info and metadata from the repository
             final String repositoryName = request.repository();
             Repository repository = repositoriesService.repository(repositoryName);
-            final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
+            final ListenableFuture<RepositoryData> repositoryDataListener = new ListenableFuture<>();
             repository.getRepositoryData(repositoryDataListener);
 
-            repositoryDataListener.whenComplete(repositoryData -> repositoryUuidRefreshStep.whenComplete(ignored -> {
-                final String snapshotName = request.snapshot();
-                final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds()
-                    .stream()
-                    .filter(s -> snapshotName.equals(s.getName()))
-                    .findFirst();
-                if (matchingSnapshotId.isPresent() == false) {
-                    throw new SnapshotRestoreException(repositoryName, snapshotName, "snapshot does not exist");
-                }
+            repositoryDataListener.addListener(
+                listener.delegateFailureAndWrap(
+                    (delegate, repositoryData) -> repositoryUuidRefreshStep.addListener(
+                        delegate.delegateFailureAndWrap((subDelegate, ignored) -> {
+                            final String snapshotName = request.snapshot();
+                            final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds()
+                                .stream()
+                                .filter(s -> snapshotName.equals(s.getName()))
+                                .findFirst();
+                            if (matchingSnapshotId.isPresent() == false) {
+                                throw new SnapshotRestoreException(repositoryName, snapshotName, "snapshot does not exist");
+                            }
 
-                final SnapshotId snapshotId = matchingSnapshotId.get();
-                if (request.snapshotUuid() != null && request.snapshotUuid().equals(snapshotId.getUUID()) == false) {
-                    throw new SnapshotRestoreException(
-                        repositoryName,
-                        snapshotName,
-                        "snapshot UUID mismatch: expected [" + request.snapshotUuid() + "] but got [" + snapshotId.getUUID() + "]"
-                    );
-                }
-                repository.getSnapshotInfo(
-                    snapshotId,
-                    listener.delegateFailureAndWrap(
-                        (l, snapshotInfo) -> startRestore(snapshotInfo, repository, request, repositoryData, updater, l)
+                            final SnapshotId snapshotId = matchingSnapshotId.get();
+                            if (request.snapshotUuid() != null && request.snapshotUuid().equals(snapshotId.getUUID()) == false) {
+                                throw new SnapshotRestoreException(
+                                    repositoryName,
+                                    snapshotName,
+                                    "snapshot UUID mismatch: expected ["
+                                        + request.snapshotUuid()
+                                        + "] but got ["
+                                        + snapshotId.getUUID()
+                                        + "]"
+                                );
+                            }
+                            repository.getSnapshotInfo(
+                                snapshotId,
+                                subDelegate.delegateFailureAndWrap(
+                                    (l, snapshotInfo) -> startRestore(snapshotInfo, repository, request, repositoryData, updater, l)
+                                )
+                            );
+                        })
                     )
-                );
-            }, listener::onFailure), listener::onFailure);
+                )
+            );
         } catch (Exception e) {
             logger.warn(() -> "[" + request.repository() + ":" + request.snapshot() + "] failed to restore snapshot", e);
             listener.onFailure(e);

+ 91 - 82
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -17,7 +17,6 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionRunnable;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
@@ -443,15 +442,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 
         // 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
         // TODO: we could skip this step for snapshots with state SUCCESS
-        final StepListener<SnapshotInfo> snapshotInfoListener = new StepListener<>();
+        final ListenableFuture<SnapshotInfo> snapshotInfoListener = new ListenableFuture<>();
         repository.getSnapshotInfo(sourceSnapshot, snapshotInfoListener);
 
-        final StepListener<Collection<Tuple<IndexId, Integer>>> allShardCountsListener = new StepListener<>();
+        final ListenableFuture<Collection<Tuple<IndexId, Integer>>> allShardCountsListener = new ListenableFuture<>();
         final GroupedActionListener<Tuple<IndexId, Integer>> shardCountListener = new GroupedActionListener<>(
             indices.size(),
             allShardCountsListener
         );
-        snapshotInfoListener.whenComplete(snapshotInfo -> {
+        snapshotInfoListener.addListener(ActionListener.wrap(snapshotInfo -> {
             for (IndexId indexId : indices) {
                 if (RestoreService.failed(snapshotInfo, indexId.getName())) {
                     throw new SnapshotException(
@@ -469,95 +468,105 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     }));
                 }
             }, onFailure));
-        }, onFailure);
+        }, onFailure));
 
         // 3. step, we have all the shard counts, now update the cluster state to have clone jobs in the snap entry
-        allShardCountsListener.whenComplete(counts -> executeConsistentStateUpdate(repository, repoData -> new ClusterStateUpdateTask() {
+        allShardCountsListener.addListener(
+            ActionListener.wrap(counts -> executeConsistentStateUpdate(repository, repoData -> new ClusterStateUpdateTask() {
 
-            private SnapshotsInProgress.Entry updatedEntry;
+                private SnapshotsInProgress.Entry updatedEntry;
 
-            @Override
-            public ClusterState execute(ClusterState currentState) {
-                final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
-                final String repoName = cloneEntry.repository();
-                final List<SnapshotsInProgress.Entry> existingEntries = snapshotsInProgress.forRepo(repoName);
-                final List<SnapshotsInProgress.Entry> updatedEntries = new ArrayList<>(existingEntries.size());
-                final String localNodeId = currentState.nodes().getLocalNodeId();
-                final ShardGenerations shardGenerations = repoData.shardGenerations();
-                for (SnapshotsInProgress.Entry existing : existingEntries) {
-                    if (cloneEntry.snapshot().getSnapshotId().equals(existing.snapshot().getSnapshotId())) {
-                        final ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clonesBuilder = ImmutableOpenMap.builder();
-                        final boolean readyToExecute = currentState.custom(
-                            SnapshotDeletionsInProgress.TYPE,
-                            SnapshotDeletionsInProgress.EMPTY
-                        ).hasExecutingDeletion(repoName) == false;
-                        final InFlightShardSnapshotStates inFlightShardStates;
-                        if (readyToExecute) {
-                            inFlightShardStates = InFlightShardSnapshotStates.forEntries(snapshotsInProgress.forRepo(repoName));
-                        } else {
-                            // no need to compute these, we'll mark all shards as queued anyway because we wait for the delete
-                            inFlightShardStates = null;
-                        }
-                        for (Tuple<IndexId, Integer> count : counts) {
-                            for (int shardId = 0; shardId < count.v2(); shardId++) {
-                                final RepositoryShardId repoShardId = new RepositoryShardId(count.v1(), shardId);
-                                final String indexName = repoShardId.indexName();
-                                if (readyToExecute == false || inFlightShardStates.isActive(indexName, shardId)) {
-                                    clonesBuilder.put(repoShardId, ShardSnapshotStatus.UNASSIGNED_QUEUED);
-                                } else {
-                                    clonesBuilder.put(
-                                        repoShardId,
-                                        new ShardSnapshotStatus(
-                                            localNodeId,
-                                            inFlightShardStates.generationForShard(repoShardId.index(), shardId, shardGenerations)
-                                        )
-                                    );
+                @Override
+                public ClusterState execute(ClusterState currentState) {
+                    final SnapshotsInProgress snapshotsInProgress = currentState.custom(
+                        SnapshotsInProgress.TYPE,
+                        SnapshotsInProgress.EMPTY
+                    );
+                    final String repoName = cloneEntry.repository();
+                    final List<SnapshotsInProgress.Entry> existingEntries = snapshotsInProgress.forRepo(repoName);
+                    final List<SnapshotsInProgress.Entry> updatedEntries = new ArrayList<>(existingEntries.size());
+                    final String localNodeId = currentState.nodes().getLocalNodeId();
+                    final ShardGenerations shardGenerations = repoData.shardGenerations();
+                    for (SnapshotsInProgress.Entry existing : existingEntries) {
+                        if (cloneEntry.snapshot().getSnapshotId().equals(existing.snapshot().getSnapshotId())) {
+                            final ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clonesBuilder = ImmutableOpenMap
+                                .builder();
+                            final boolean readyToExecute = currentState.custom(
+                                SnapshotDeletionsInProgress.TYPE,
+                                SnapshotDeletionsInProgress.EMPTY
+                            ).hasExecutingDeletion(repoName) == false;
+                            final InFlightShardSnapshotStates inFlightShardStates;
+                            if (readyToExecute) {
+                                inFlightShardStates = InFlightShardSnapshotStates.forEntries(snapshotsInProgress.forRepo(repoName));
+                            } else {
+                                // no need to compute these, we'll mark all shards as queued anyway because we wait for the delete
+                                inFlightShardStates = null;
+                            }
+                            for (Tuple<IndexId, Integer> count : counts) {
+                                for (int shardId = 0; shardId < count.v2(); shardId++) {
+                                    final RepositoryShardId repoShardId = new RepositoryShardId(count.v1(), shardId);
+                                    final String indexName = repoShardId.indexName();
+                                    if (readyToExecute == false || inFlightShardStates.isActive(indexName, shardId)) {
+                                        clonesBuilder.put(repoShardId, ShardSnapshotStatus.UNASSIGNED_QUEUED);
+                                    } else {
+                                        clonesBuilder.put(
+                                            repoShardId,
+                                            new ShardSnapshotStatus(
+                                                localNodeId,
+                                                inFlightShardStates.generationForShard(repoShardId.index(), shardId, shardGenerations)
+                                            )
+                                        );
+                                    }
                                 }
                             }
+                            updatedEntry = cloneEntry.withClones(clonesBuilder.build());
+                        } else {
+                            updatedEntries.add(existing);
                         }
-                        updatedEntry = cloneEntry.withClones(clonesBuilder.build());
-                    } else {
-                        updatedEntries.add(existing);
                     }
+                    if (updatedEntry != null) {
+                        // Move the now ready to execute clone operation to the back of the snapshot operations order because its
+                        // shard snapshot state was based on all previous existing operations in progress
+                        // TODO: If we could eventually drop the snapshot clone init phase we don't need this any longer
+                        updatedEntries.add(updatedEntry);
+                        return updateWithSnapshots(
+                            currentState,
+                            snapshotsInProgress.withUpdatedEntriesForRepo(repoName, updatedEntries),
+                            null
+                        );
+                    }
+                    return currentState;
                 }
-                if (updatedEntry != null) {
-                    // Move the now ready to execute clone operation to the back of the snapshot operations order because its
-                    // shard snapshot state was based on all previous existing operations in progress
-                    // TODO: If we could eventually drop the snapshot clone init phase we don't need this any longer
-                    updatedEntries.add(updatedEntry);
-                    return updateWithSnapshots(currentState, snapshotsInProgress.withUpdatedEntriesForRepo(repoName, updatedEntries), null);
-                }
-                return currentState;
-            }
 
-            @Override
-            public void onFailure(Exception e) {
-                initializingClones.remove(targetSnapshot);
-                logger.info(() -> "Failed to start snapshot clone [" + cloneEntry + "]", e);
-                failAllListenersOnMasterFailOver(e);
-            }
+                @Override
+                public void onFailure(Exception e) {
+                    initializingClones.remove(targetSnapshot);
+                    logger.info(() -> "Failed to start snapshot clone [" + cloneEntry + "]", e);
+                    failAllListenersOnMasterFailOver(e);
+                }
 
-            @Override
-            public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-                initializingClones.remove(targetSnapshot);
-                if (updatedEntry != null) {
-                    final Snapshot target = updatedEntry.snapshot();
-                    final SnapshotId sourceSnapshot = updatedEntry.source();
-                    for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> indexClone : updatedEntry.shardsByRepoShardId().entrySet()) {
-                        final ShardSnapshotStatus shardStatusBefore = indexClone.getValue();
-                        if (shardStatusBefore.state() != ShardState.INIT) {
-                            continue;
+                @Override
+                public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
+                    initializingClones.remove(targetSnapshot);
+                    if (updatedEntry != null) {
+                        final Snapshot target = updatedEntry.snapshot();
+                        final SnapshotId sourceSnapshot = updatedEntry.source();
+                        for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> indexClone : updatedEntry.shardsByRepoShardId().entrySet()) {
+                            final ShardSnapshotStatus shardStatusBefore = indexClone.getValue();
+                            if (shardStatusBefore.state() != ShardState.INIT) {
+                                continue;
+                            }
+                            final RepositoryShardId repoShardId = indexClone.getKey();
+                            runReadyClone(target, sourceSnapshot, shardStatusBefore, repoShardId, repository);
                         }
-                        final RepositoryShardId repoShardId = indexClone.getKey();
-                        runReadyClone(target, sourceSnapshot, shardStatusBefore, repoShardId, repository);
+                    } else {
+                        // Extremely unlikely corner case of master failing over between between starting the clone and
+                        // starting shard clones.
+                        logger.warn("Did not find expected entry [{}] in the cluster state", cloneEntry);
                     }
-                } else {
-                    // Extremely unlikely corner case of master failing over between between starting the clone and
-                    // starting shard clones.
-                    logger.warn("Did not find expected entry [{}] in the cluster state", cloneEntry);
                 }
-            }
-        }, "start snapshot clone", onFailure), onFailure);
+            }, "start snapshot clone", onFailure), onFailure)
+        );
     }
 
     private final Set<RepositoryShardId> currentlyCloning = Collections.synchronizedSet(new HashSet<>());
@@ -1344,7 +1353,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 }
             }
             final String repository = snapshot.getRepository();
-            final StepListener<Metadata> metadataListener = new StepListener<>();
+            final ListenableFuture<Metadata> metadataListener = new ListenableFuture<>();
             final Repository repo = repositoriesService.repository(snapshot.getRepository());
             if (entry.isClone()) {
                 threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> {
@@ -1373,7 +1382,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             } else {
                 metadataListener.onResponse(metadata);
             }
-            metadataListener.whenComplete(meta -> {
+            metadataListener.addListener(ActionListener.wrap(meta -> {
                 final Metadata metaForSnapshot = metadataForSnapshot(entry, meta);
 
                 final Map<String, SnapshotInfo.IndexSnapshotDetails> indexSnapshotDetails = Maps.newMapWithExpectedSize(
@@ -1455,7 +1464,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                         })
                     )
                 );
-            }, e -> handleFinalizationFailure(e, snapshot, repositoryData));
+            }, e -> handleFinalizationFailure(e, snapshot, repositoryData)));
         } catch (Exception e) {
             assert false : new AssertionError(e);
             handleFinalizationFailure(e, snapshot, repositoryData);

+ 3 - 3
server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java

@@ -15,7 +15,6 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ResultDeduplicator;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.ChannelActionListener;
 import org.elasticsearch.action.support.CountDownActionListener;
 import org.elasticsearch.action.support.GroupedActionListener;
@@ -23,6 +22,7 @@ import org.elasticsearch.action.support.RefCountingRunnable;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.EmptyTransportResponseHandler;
 import org.elasticsearch.transport.NodeDisconnectedException;
@@ -109,8 +109,8 @@ public class TaskCancellationService {
         final TaskId taskId = task.taskInfo(localNodeId(), false).taskId();
         if (task.shouldCancelChildrenOnCancellation()) {
             logger.trace("cancelling task [{}] and its descendants", taskId);
-            StepListener<Void> completedListener = new StepListener<>();
-            StepListener<Void> setBanListener = new StepListener<>();
+            ListenableFuture<Void> completedListener = new ListenableFuture<>();
+            ListenableFuture<Void> setBanListener = new ListenableFuture<>();
 
             Collection<Transport.Connection> childConnections;
             try (var refs = new RefCountingRunnable(() -> setBanListener.addListener(completedListener))) {

+ 10 - 10
server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

@@ -11,7 +11,6 @@ package org.elasticsearch.transport;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
@@ -27,6 +26,7 @@ import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.Booleans;
 import org.elasticsearch.core.IOUtils;
@@ -246,15 +246,15 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
 
             final DiscoveryNode seedNode = seedNodesSuppliers.next().get();
             logger.trace("[{}] opening transient connection to seed node: [{}]", clusterAlias, seedNode);
-            final StepListener<Transport.Connection> openConnectionStep = new StepListener<>();
+            final ListenableFuture<Transport.Connection> openConnectionStep = new ListenableFuture<>();
             try {
                 connectionManager.openConnection(seedNode, null, openConnectionStep);
             } catch (Exception e) {
                 onFailure.accept(e);
             }
 
-            final StepListener<TransportService.HandshakeResponse> handshakeStep = new StepListener<>();
-            openConnectionStep.whenComplete(connection -> {
+            final ListenableFuture<TransportService.HandshakeResponse> handshakeStep = new ListenableFuture<>();
+            openConnectionStep.addListener(ActionListener.wrap(connection -> {
                 ConnectionProfile connectionProfile = connectionManager.getConnectionProfile();
                 transportService.handshake(
                     connection,
@@ -262,10 +262,10 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
                     getRemoteClusterNamePredicate(),
                     handshakeStep
                 );
-            }, onFailure);
+            }, onFailure));
 
-            final StepListener<Void> fullConnectionStep = new StepListener<>();
-            handshakeStep.whenComplete(handshakeResponse -> {
+            final ListenableFuture<Void> fullConnectionStep = new ListenableFuture<>();
+            handshakeStep.addListener(ActionListener.wrap(handshakeResponse -> {
                 final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
 
                 if (nodePredicate.test(handshakeNode) && shouldOpenMoreConnections()) {
@@ -290,9 +290,9 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
                 logger.debug(() -> format("[%s] failed to handshake with seed node: [%s]", clusterAlias, node), e);
                 IOUtils.closeWhileHandlingException(connection);
                 onFailure.accept(e);
-            });
+            }));
 
-            fullConnectionStep.whenComplete(aVoid -> {
+            fullConnectionStep.addListener(ActionListener.wrap(aVoid -> {
                 if (remoteClusterName.get() == null) {
                     TransportService.HandshakeResponse handshakeResponse = handshakeStep.result();
                     assert handshakeResponse.getClusterName().value() != null;
@@ -344,7 +344,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
                 logger.debug(() -> format("[%s] failed to open managed connection to seed node: [%s]", clusterAlias, node), e);
                 IOUtils.closeWhileHandlingException(connection);
                 onFailure.accept(e);
-            });
+            }));
         } else {
             listener.onFailure(new NoSeedNodeLeftException("no seed node left for cluster: [" + clusterAlias + "]"));
         }

+ 0 - 136
server/src/test/java/org/elasticsearch/action/StepListenerTests.java

@@ -1,136 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-package org.elasticsearch.action;
-
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.test.ReachabilityChecker;
-import org.elasticsearch.threadpool.TestThreadPool;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.RemoteTransportException;
-import org.junit.After;
-import org.junit.Before;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-
-import static org.hamcrest.Matchers.equalTo;
-
-public class StepListenerTests extends ESTestCase {
-    private ThreadPool threadPool;
-
-    @Before
-    public void setUpThreadPool() {
-        threadPool = new TestThreadPool(getTestName());
-    }
-
-    @After
-    public void tearDownThreadPool() {
-        terminate(threadPool);
-    }
-
-    public void testSimpleSteps() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
-        Consumer<Exception> onFailure = e -> {
-            latch.countDown();
-            fail("test a happy path");
-        };
-
-        StepListener<String> step1 = new StepListener<>(); // [a]sync provide a string
-        executeAction(() -> step1.onResponse("hello"));
-        StepListener<Integer> step2 = new StepListener<>(); // [a]sync calculate the length of the string
-        step1.whenComplete(str -> executeAction(() -> step2.onResponse(str.length())), onFailure);
-        step2.whenComplete(length -> executeAction(latch::countDown), onFailure);
-        latch.await();
-        assertThat(step1.result(), equalTo("hello"));
-        assertThat(step2.result(), equalTo(5));
-    }
-
-    public void testAbortOnFailure() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
-        int failedStep = randomBoolean() ? 1 : 2;
-        AtomicInteger failureNotified = new AtomicInteger();
-        Consumer<Exception> onFailure = e -> {
-            failureNotified.getAndIncrement();
-            latch.countDown();
-            assertThat(e.getMessage(), equalTo("failed at step " + failedStep));
-        };
-
-        StepListener<String> step1 = new StepListener<>(); // [a]sync provide a string
-        if (failedStep == 1) {
-            executeAction(() -> step1.onFailure(new RuntimeException("failed at step 1")));
-        } else {
-            executeAction(() -> step1.onResponse("hello"));
-        }
-
-        StepListener<Integer> step2 = new StepListener<>(); // [a]sync calculate the length of the string
-        step1.whenComplete(str -> {
-            if (failedStep == 2) {
-                executeAction(() -> step2.onFailure(new RuntimeException("failed at step 2")));
-            } else {
-                executeAction(() -> step2.onResponse(str.length()));
-            }
-        }, onFailure);
-
-        step2.whenComplete(length -> latch.countDown(), onFailure);
-        latch.await();
-        assertThat(failureNotified.get(), equalTo(1));
-
-        if (failedStep == 1) {
-            assertThat(expectThrows(RuntimeException.class, step1::result).getMessage(), equalTo("failed at step 1"));
-            assertFalse(step2.isDone());
-        } else {
-            assertThat(step1.result(), equalTo("hello"));
-            assertThat(expectThrows(RuntimeException.class, step2::result).getMessage(), equalTo("failed at step 2"));
-        }
-    }
-
-    private void executeAction(Runnable runnable) {
-        if (randomBoolean()) {
-            threadPool.generic().execute(runnable);
-        } else {
-            runnable.run();
-        }
-    }
-
-    /**
-     * This test checks that we no longer unwrap exceptions when using StepListener.
-     */
-    public void testNoUnwrap() {
-        StepListener<String> step = new StepListener<>();
-        step.onFailure(new RemoteTransportException("test", new RuntimeException("expected")));
-        AtomicReference<RuntimeException> exception = new AtomicReference<>();
-        step.whenComplete(null, e -> { exception.set((RuntimeException) e); });
-
-        assertEquals(RemoteTransportException.class, exception.get().getClass());
-        RuntimeException e = expectThrows(RuntimeException.class, () -> step.result());
-        assertEquals(RemoteTransportException.class, e.getClass());
-    }
-
-    public void testAddedListenersReleasedOnCompletion() {
-        final StepListener<Void> step = new StepListener<>();
-        final ReachabilityChecker reachabilityChecker = new ReachabilityChecker();
-
-        for (int i = between(1, 3); i > 0; i--) {
-            step.addListener(reachabilityChecker.register(ActionListener.running(() -> {})));
-        }
-        reachabilityChecker.checkReachable();
-        if (randomBoolean()) {
-            step.onResponse(null);
-        } else {
-            step.onFailure(new ElasticsearchException("simulated"));
-        }
-        reachabilityChecker.ensureUnreachable();
-
-        step.addListener(reachabilityChecker.register(ActionListener.running(() -> {})));
-        reachabilityChecker.ensureUnreachable();
-    }
-}

+ 14 - 14
server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java

@@ -10,10 +10,10 @@ package org.elasticsearch.common.util;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.test.ESTestCase;
@@ -234,7 +234,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
                         throw new AssertionError(e);
                     }
 
-                    final StepListener<Integer> stepListener = new StepListener<>();
+                    final ListenableFuture<Integer> stepListener = new ListenableFuture<>();
                     final AtomicBoolean isComplete = new AtomicBoolean();
                     final AtomicBoolean isCancelled = new AtomicBoolean();
                     try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
@@ -255,7 +255,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
                         isCancelled.set(true);
                     }
 
-                    stepListener.whenComplete(len -> {
+                    stepListener.addListener(ActionListener.wrap(len -> {
                         finishLatch.countDown();
                         assertThat(len, equalTo(input.length()));
                         assertNotEquals("FAIL", input);
@@ -266,7 +266,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
                         } else {
                             assertEquals("FAIL", input);
                         }
-                    });
+                    }));
                 });
             }
 
@@ -331,7 +331,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
                         throw new AssertionError(e);
                     }
 
-                    final StepListener<Integer> stepListener = new StepListener<>();
+                    final ListenableFuture<Integer> stepListener = new ListenableFuture<>();
                     final AtomicBoolean isComplete = new AtomicBoolean();
                     final AtomicBoolean isCancelled = new AtomicBoolean();
                     testCache.get(
@@ -349,7 +349,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
                         isCancelled.set(true);
                     }
 
-                    stepListener.whenComplete(len -> {
+                    stepListener.addListener(ActionListener.wrap(len -> {
                         finishLatch.countDown();
                         assertThat(len, greaterThanOrEqualTo(input.length()));
                     }, e -> {
@@ -359,7 +359,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
                         } else {
                             throw new AssertionError("unexpected", e);
                         }
-                    });
+                    }));
                 });
             }
 
@@ -437,7 +437,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
 
     private static class TestCache extends CancellableSingleObjectCache<String, String, Integer> {
 
-        private final LinkedList<StepListener<Function<String, Integer>>> pendingRefreshes = new LinkedList<>();
+        private final LinkedList<ListenableFuture<Function<String, Integer>>> pendingRefreshes = new LinkedList<>();
 
         private TestCache() {
             super(testThreadContext);
@@ -450,17 +450,17 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
             BooleanSupplier supersedeIfStale,
             ActionListener<Integer> listener
         ) {
-            final StepListener<Function<String, Integer>> stepListener = new StepListener<>();
+            final ListenableFuture<Function<String, Integer>> stepListener = new ListenableFuture<>();
             pendingRefreshes.offer(stepListener);
-            stepListener.whenComplete(f -> {
+            stepListener.addListener(listener.delegateFailureAndWrap((l, f) -> {
                 if (supersedeIfStale.getAsBoolean()) {
                     return;
                 }
-                ActionListener.completeWith(listener, () -> {
+                ActionListener.completeWith(l, () -> {
                     ensureNotCancelled.run();
                     return f.apply(input);
                 });
-            }, listener::onFailure);
+            }));
         }
 
         @Override
@@ -491,9 +491,9 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
             nextRefresh().onResponse(k -> { throw new AssertionError("should not be called"); });
         }
 
-        private StepListener<Function<String, Integer>> nextRefresh() {
+        private ListenableFuture<Function<String, Integer>> nextRefresh() {
             assertThat(pendingRefreshes, not(empty()));
-            final StepListener<Function<String, Integer>> nextRefresh = pendingRefreshes.poll();
+            final ListenableFuture<Function<String, Integer>> nextRefresh = pendingRefreshes.poll();
             assertNotNull(nextRefresh);
             return nextRefresh;
         }

+ 2 - 2
server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

@@ -24,7 +24,6 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.LatchedActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -43,6 +42,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.CancellableThreads;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Strings;
@@ -1056,7 +1056,7 @@ public class RecoverySourceHandlerTests extends MapperServiceTestCase {
             }
         };
         cancelRecovery.set(() -> handler.cancel("test"));
-        final StepListener<RecoverySourceHandler.SendFileResult> phase1Listener = new StepListener<>();
+        final ListenableFuture<RecoverySourceHandler.SendFileResult> phase1Listener = new ListenableFuture<>();
         try {
             final CountDownLatch latch = new CountDownLatch(1);
             handler.phase1(DirectoryReader.listCommits(dir).get(0), 0, () -> 0, new LatchedActionListener<>(phase1Listener, latch));

+ 50 - 47
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -17,7 +17,6 @@ import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.RequestValidators;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction;
 import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
 import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
@@ -131,6 +130,7 @@ import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.env.Environment;
@@ -244,8 +244,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
         try {
             clearDisruptionsAndAwaitSync();
 
-            final StepListener<CleanupRepositoryResponse> cleanupResponse = new StepListener<>();
-            final StepListener<CreateSnapshotResponse> createSnapshotResponse = new StepListener<>();
+            final ListenableFuture<CleanupRepositoryResponse> cleanupResponse = new ListenableFuture<>();
+            final ListenableFuture<CreateSnapshotResponse> createSnapshotResponse = new ListenableFuture<>();
             // Create another snapshot and then clean up the repository to verify that the repository works correctly no matter the
             // failures seen during the previous test.
             client().admin()
@@ -290,7 +290,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final StepListener<CreateSnapshotResponse> createSnapshotResponseListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseListener = new ListenableFuture<>();
 
         continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> {
             final Runnable afterIndexing = () -> client().admin()
@@ -305,7 +305,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 for (int i = 0; i < documents; ++i) {
                     bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
                 }
-                final StepListener<BulkResponse> bulkResponseStepListener = new StepListener<>();
+                final ListenableFuture<BulkResponse> bulkResponseStepListener = new ListenableFuture<>();
                 client().bulk(bulkRequest, bulkResponseStepListener);
                 continueOrDie(bulkResponseStepListener, bulkResponse -> {
                     assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
@@ -315,14 +315,14 @@ public class SnapshotResiliencyTests extends ESTestCase {
             }
         });
 
-        final StepListener<AcknowledgedResponse> deleteIndexListener = new StepListener<>();
+        final ListenableFuture<AcknowledgedResponse> deleteIndexListener = new ListenableFuture<>();
 
         continueOrDie(
             createSnapshotResponseListener,
             createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener)
         );
 
-        final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseListener = new StepListener<>();
+        final ListenableFuture<RestoreSnapshotResponse> restoreSnapshotResponseListener = new ListenableFuture<>();
         continueOrDie(
             deleteIndexListener,
             ignored -> client().admin()
@@ -333,7 +333,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 )
         );
 
-        final StepListener<SearchResponse> searchResponseListener = new StepListener<>();
+        final ListenableFuture<SearchResponse> searchResponseListener = new ListenableFuture<>();
         continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> {
             assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
             client().search(
@@ -365,7 +365,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
     }
 
     private SnapshotInfo getSnapshotInfo(Repository repository, SnapshotId snapshotId) {
-        final StepListener<SnapshotInfo> listener = new StepListener<>();
+        final ListenableFuture<SnapshotInfo> listener = new ListenableFuture<>();
         repository.getSnapshotInfo(snapshotId, listener);
         deterministicTaskQueue.runAllRunnableTasks();
         return listener.result();
@@ -381,7 +381,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         final String index = "test";
         final int shards = randomIntBetween(1, 10);
 
-        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
 
         final boolean partial = randomBoolean();
         continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> {
@@ -400,7 +400,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
 
         final AtomicBoolean snapshotNeverStarted = new AtomicBoolean(false);
 
-        createSnapshotResponseStepListener.whenComplete(createSnapshotResponse -> {
+        createSnapshotResponseStepListener.addListener(ActionListener.wrap(createSnapshotResponse -> {
             for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) {
                 scheduleNow(this::disconnectOrRestartDataNode);
             }
@@ -423,7 +423,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             } else {
                 throw new AssertionError(e);
             }
-        });
+        }));
 
         runUntil(() -> testClusterNodes.randomMasterNode().map(master -> {
             if (snapshotNeverStarted.get()) {
@@ -460,7 +460,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         final int shards = randomIntBetween(1, 10);
 
         final boolean waitForSnapshot = randomBoolean();
-        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
             createIndexResponse -> testClusterNodes.randomMasterNodeSafe().client.admin()
@@ -515,7 +515,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
@@ -525,7 +525,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 .execute(createSnapshotResponseStepListener)
         );
 
-        final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
+        final ListenableFuture<AcknowledgedResponse> deleteSnapshotStepListener = new ListenableFuture<>();
 
         masterNode.clusterService.addListener(new ClusterStateListener() {
             @Override
@@ -537,7 +537,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             }
         });
 
-        final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(
             deleteSnapshotStepListener,
@@ -580,7 +580,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
@@ -591,7 +591,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 .execute(createSnapshotResponseStepListener)
         );
 
-        final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(
             createSnapshotResponseStepListener,
@@ -601,7 +601,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 .execute(createOtherSnapshotResponseStepListener)
         );
 
-        final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
+        final ListenableFuture<AcknowledgedResponse> deleteSnapshotStepListener = new ListenableFuture<>();
 
         continueOrDie(
             createOtherSnapshotResponseStepListener,
@@ -611,7 +611,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 .execute(deleteSnapshotStepListener)
         );
 
-        final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(deleteSnapshotStepListener, deleted -> {
             client().admin()
@@ -654,7 +654,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
@@ -666,7 +666,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         );
 
         final int inProgressSnapshots = randomIntBetween(1, 5);
-        final StepListener<Collection<CreateSnapshotResponse>> createOtherSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<Collection<CreateSnapshotResponse>> createOtherSnapshotResponseStepListener = new ListenableFuture<>();
         final ActionListener<CreateSnapshotResponse> createSnapshotListener = new GroupedActionListener<>(
             inProgressSnapshots,
             createOtherSnapshotResponseStepListener
@@ -678,7 +678,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             }
         });
 
-        final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
+        final ListenableFuture<AcknowledgedResponse> deleteSnapshotStepListener = new ListenableFuture<>();
 
         continueOrDie(
             createOtherSnapshotResponseStepListener,
@@ -708,7 +708,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
 
         final int documentsFirstSnapshot = randomIntBetween(0, 100);
 
@@ -727,7 +727,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
 
         final int documentsSecondSnapshot = randomIntBetween(0, 100);
 
-        final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new ListenableFuture<>();
 
         final String secondSnapshotName = "snapshot-2";
         continueOrDie(
@@ -743,8 +743,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
             )
         );
 
-        final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
-        final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseListener = new StepListener<>();
+        final ListenableFuture<AcknowledgedResponse> deleteSnapshotStepListener = new ListenableFuture<>();
+        final ListenableFuture<RestoreSnapshotResponse> restoreSnapshotResponseListener = new ListenableFuture<>();
 
         continueOrDie(createOtherSnapshotResponseStepListener, createSnapshotResponse -> {
             scheduleNow(() -> client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener));
@@ -760,7 +760,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             );
         });
 
-        final StepListener<SearchResponse> searchResponseListener = new StepListener<>();
+        final ListenableFuture<SearchResponse> searchResponseListener = new ListenableFuture<>();
         continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> {
             assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
             client().search(
@@ -800,7 +800,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         for (int i = 0; i < documents; ++i) {
             bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
         }
-        final StepListener<BulkResponse> bulkResponseStepListener = new StepListener<>();
+        final ListenableFuture<BulkResponse> bulkResponseStepListener = new ListenableFuture<>();
         client().bulk(bulkRequest, bulkResponseStepListener);
         continueOrDie(bulkResponseStepListener, bulkResponse -> {
             assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
@@ -820,7 +820,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final StepListener<Collection<CreateIndexResponse>> createIndicesListener = new StepListener<>();
+        final ListenableFuture<Collection<CreateIndexResponse>> createIndicesListener = new ListenableFuture<>();
         final int indices = randomIntBetween(5, 20);
 
         final SetOnce<Index> firstIndex = new SetOnce<>();
@@ -834,7 +834,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             }
         });
 
-        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
 
         final boolean partialSnapshot = randomBoolean();
 
@@ -911,7 +911,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
@@ -922,12 +922,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 .execute(createSnapshotResponseStepListener)
         );
 
-        final Collection<StepListener<Boolean>> deleteSnapshotStepListeners = List.of(new StepListener<>(), new StepListener<>());
+        final Collection<ListenableFuture<Boolean>> deleteSnapshotStepListeners = List.of(
+            new ListenableFuture<>(),
+            new ListenableFuture<>()
+        );
 
         final AtomicInteger successfulDeletes = new AtomicInteger(0);
 
         continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
-            for (StepListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
+            for (ListenableFuture<Boolean> deleteListener : deleteSnapshotStepListeners) {
                 client().admin()
                     .cluster()
                     .prepareDeleteSnapshot(repoName, snapshotName)
@@ -943,7 +946,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             }
         });
 
-        for (StepListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
+        for (ListenableFuture<Boolean> deleteListener : deleteSnapshotStepListeners) {
             continueOrDie(deleteListener, deleted -> {
                 if (deleted) {
                     successfulDeletes.incrementAndGet();
@@ -985,7 +988,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         final AtomicBoolean createdSnapshot = new AtomicBoolean();
         final AdminClient masterAdminClient = masterNode.client.admin();
 
-        final StepListener<ClusterStateResponse> clusterStateResponseStepListener = new StepListener<>();
+        final ListenableFuture<ClusterStateResponse> clusterStateResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(
             createRepoAndIndex(repoName, index, shards),
@@ -1000,7 +1003,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             scheduleNow(new Runnable() {
                 @Override
                 public void run() {
-                    final StepListener<ClusterStateResponse> updatedClusterStateResponseStepListener = new StepListener<>();
+                    final ListenableFuture<ClusterStateResponse> updatedClusterStateResponseStepListener = new ListenableFuture<>();
                     masterAdminClient.cluster().state(new ClusterStateRequest(), updatedClusterStateResponseStepListener);
                     continueOrDie(updatedClusterStateResponseStepListener, updatedClusterState -> {
                         final ShardRouting shardRouting = updatedClusterState.getState()
@@ -1076,7 +1079,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateSnapshotResponse> createSnapshotResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> {
             final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false);
@@ -1101,7 +1104,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
 
         final String restoredIndex = "restored";
 
-        final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseStepListener = new StepListener<>();
+        final ListenableFuture<RestoreSnapshotResponse> restoreSnapshotResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(
             createSnapshotResponseStepListener,
@@ -1115,7 +1118,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 )
         );
 
-        final StepListener<SearchResponse> searchResponseStepListener = new StepListener<>();
+        final ListenableFuture<SearchResponse> searchResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(restoreSnapshotResponseStepListener, restoreSnapshotResponse -> {
             assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
@@ -1173,7 +1176,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             testClusterNodes.nodes.values().iterator().next().clusterService.state()
         );
 
-        final StepListener<Collection<CreateSnapshotResponse>> allSnapshotsListener = new StepListener<>();
+        final ListenableFuture<Collection<CreateSnapshotResponse>> allSnapshotsListener = new ListenableFuture<>();
         final ActionListener<CreateSnapshotResponse> snapshotListener = new GroupedActionListener<>(
             snapshotNames.size(),
             allSnapshotsListener
@@ -1193,7 +1196,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             for (int i = 0; i < documents; ++i) {
                 bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
             }
-            final StepListener<BulkResponse> bulkResponseStepListener = new StepListener<>();
+            final ListenableFuture<BulkResponse> bulkResponseStepListener = new ListenableFuture<>();
             client().bulk(bulkRequest, bulkResponseStepListener);
             continueOrDie(bulkResponseStepListener, bulkResponse -> {
                 assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
@@ -1234,8 +1237,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
         return res.actionGet();
     }
 
-    private StepListener<CreateIndexResponse> createRepoAndIndex(String repoName, String index, int shards) {
-        final StepListener<AcknowledgedResponse> createRepositoryListener = new StepListener<>();
+    private ListenableFuture<CreateIndexResponse> createRepoAndIndex(String repoName, String index, int shards) {
+        final ListenableFuture<AcknowledgedResponse> createRepositoryListener = new ListenableFuture<>();
 
         client().admin()
             .cluster()
@@ -1244,7 +1247,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
             .setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
             .execute(createRepositoryListener);
 
-        final StepListener<CreateIndexResponse> createIndexResponseStepListener = new StepListener<>();
+        final ListenableFuture<CreateIndexResponse> createIndexResponseStepListener = new ListenableFuture<>();
 
         continueOrDie(
             createRepositoryListener,
@@ -1373,8 +1376,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
             .build();
     }
 
-    private static <T> void continueOrDie(StepListener<T> listener, CheckedConsumer<T, Exception> onResponse) {
-        listener.whenComplete(onResponse, e -> { throw new AssertionError(e); });
+    private static <T> void continueOrDie(ListenableFuture<T> listener, CheckedConsumer<T, Exception> onResponse) {
+        listener.addListener(ActionListener.wrap(onResponse, e -> { throw new AssertionError(e); }));
     }
 
     public NodeClient client() {

+ 3 - 3
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -18,7 +18,6 @@ import org.elasticsearch.TransportVersion;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.ChannelActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -39,6 +38,7 @@ import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
@@ -3330,13 +3330,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         TransportRequestOptions options,
         TransportResponseHandler<T> handler
     ) throws TransportException {
-        final StepListener<T> responseListener = new StepListener<>();
+        final ListenableFuture<T> responseListener = new ListenableFuture<>();
         final TransportResponseHandler<T> futureHandler = new ActionListenerResponseHandler<>(
             responseListener,
             handler,
             handler.executor()
         );
-        responseListener.whenComplete(handler::handleResponse, e -> handler.handleException((TransportException) e));
+        responseListener.addListener(ActionListener.wrap(handler::handleResponse, e -> handler.handleException((TransportException) e)));
         final PlainActionFuture<T> future = PlainActionFuture.newFuture();
         responseListener.addListener(future);
         transportService.sendRequest(node, action, request, options, futureHandler);

+ 17 - 17
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsAction.java

@@ -9,7 +9,6 @@ package org.elasticsearch.xpack.ml.action;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
@@ -26,6 +25,7 @@ import org.elasticsearch.common.document.DocumentField;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.query.QueryBuilder;
@@ -106,16 +106,16 @@ public class TransportGetTrainedModelsStatsAction extends HandledTransportAction
 
         GetTrainedModelsStatsAction.Response.Builder responseBuilder = new GetTrainedModelsStatsAction.Response.Builder();
 
-        StepListener<Map<String, TrainedModelSizeStats>> modelSizeStatsListener = new StepListener<>();
-        modelSizeStatsListener.whenComplete(modelSizeStatsByModelId -> {
+        ListenableFuture<Map<String, TrainedModelSizeStats>> modelSizeStatsListener = new ListenableFuture<>();
+        modelSizeStatsListener.addListener(listener.delegateFailureAndWrap((l, modelSizeStatsByModelId) -> {
             responseBuilder.setModelSizeStatsByModelId(modelSizeStatsByModelId);
-            listener.onResponse(
+            l.onResponse(
                 responseBuilder.build(modelToDeployments(responseBuilder.getExpandedModelIdsWithAliases().keySet(), assignmentMetadata))
             );
-        }, listener::onFailure);
+        }));
 
-        StepListener<GetDeploymentStatsAction.Response> deploymentStatsListener = new StepListener<>();
-        deploymentStatsListener.whenComplete(deploymentStats -> {
+        ListenableFuture<GetDeploymentStatsAction.Response> deploymentStatsListener = new ListenableFuture<>();
+        deploymentStatsListener.addListener(listener.delegateFailureAndWrap((delegate, deploymentStats) -> {
             // deployment stats for each matching deployment
             // not necessarily for all models
             responseBuilder.setDeploymentStatsByDeploymentId(
@@ -130,20 +130,20 @@ public class TransportGetTrainedModelsStatsAction extends HandledTransportAction
                 parentTaskId,
                 modelSizeStatsListener
             );
-        }, listener::onFailure);
+        }));
 
-        StepListener<List<InferenceStats>> inferenceStatsListener = new StepListener<>();
+        ListenableFuture<List<InferenceStats>> inferenceStatsListener = new ListenableFuture<>();
         // inference stats are per model and are only
         // persisted for boosted tree models
-        inferenceStatsListener.whenComplete(inferenceStats -> {
+        inferenceStatsListener.addListener(listener.delegateFailureAndWrap((l, inferenceStats) -> {
             responseBuilder.setInferenceStatsByModelId(
                 inferenceStats.stream().collect(Collectors.toMap(InferenceStats::getModelId, Function.identity()))
             );
             getDeploymentStats(client, request.getResourceId(), parentTaskId, assignmentMetadata, deploymentStatsListener);
-        }, listener::onFailure);
+        }));
 
-        StepListener<NodesStatsResponse> nodesStatsListener = new StepListener<>();
-        nodesStatsListener.whenComplete(nodesStatsResponse -> {
+        ListenableFuture<NodesStatsResponse> nodesStatsListener = new ListenableFuture<>();
+        nodesStatsListener.addListener(listener.delegateFailureAndWrap((delegate, nodesStatsResponse) -> {
             // find all pipelines whether using the model id,
             // alias or deployment id.
             Set<String> allPossiblePipelineReferences = responseBuilder.getExpandedModelIdsWithAliases()
@@ -165,10 +165,10 @@ public class TransportGetTrainedModelsStatsAction extends HandledTransportAction
                 parentTaskId,
                 inferenceStatsListener
             );
-        }, listener::onFailure);
+        }));
 
-        StepListener<Tuple<Long, Map<String, Set<String>>>> idsListener = new StepListener<>();
-        idsListener.whenComplete(tuple -> {
+        ListenableFuture<Tuple<Long, Map<String, Set<String>>>> idsListener = new ListenableFuture<>();
+        idsListener.addListener(listener.delegateFailureAndWrap((delegate, tuple) -> {
             responseBuilder.setExpandedModelIdsWithAliases(tuple.v2()).setTotalModelCount(tuple.v1());
             executeAsyncWithOrigin(
                 client,
@@ -177,7 +177,7 @@ public class TransportGetTrainedModelsStatsAction extends HandledTransportAction
                 nodeStatsRequest(clusterService.state(), parentTaskId),
                 nodesStatsListener
             );
-        }, listener::onFailure);
+        }));
 
         // When the request resource is a deployment find the
         // model used in that deployment for the model stats

+ 4 - 4
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexEventListener.java

@@ -9,12 +9,12 @@ package org.elasticsearch.xpack.searchablesnapshots.allocation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.blobcache.shared.SharedBlobCacheService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
@@ -68,12 +68,12 @@ public class SearchableSnapshotIndexEventListener implements IndexEventListener
         final var store = indexShard.store();
         final SearchableSnapshotDirectory directory = unwrapDirectory(store.directory());
         assert directory != null;
-        final StepListener<Void> preWarmListener = new StepListener<>();
+        final ListenableFuture<Void> preWarmListener = new ListenableFuture<>();
         final boolean success = directory.loadSnapshot(indexShard.recoveryState(), store::isClosing, preWarmListener);
         final ShardRouting shardRouting = indexShard.routingEntry();
         if (success && shardRouting.isRelocationTarget()) {
             final Runnable preWarmCondition = indexShard.addCleanFilesDependency();
-            preWarmListener.whenComplete(v -> preWarmCondition.run(), e -> {
+            preWarmListener.addListener(ActionListener.wrap(v -> preWarmCondition.run(), e -> {
                 logger.warn(
                     () -> format(
                         "pre-warm operation failed for [%s] while it was the target of primary relocation [%s]",
@@ -83,7 +83,7 @@ public class SearchableSnapshotIndexEventListener implements IndexEventListener
                     e
                 );
                 preWarmCondition.run();
-            });
+            }));
         }
         assert directory.listAll().length > 0 : "expecting directory listing to be non-empty";
         assert success || indexShard.routingEntry().recoverySource().getType() == RecoverySource.Type.PEER

+ 27 - 14
x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/BlobAnalyzeAction.java

@@ -16,7 +16,6 @@ import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.HandledTransportAction;
@@ -33,6 +32,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.CancellableThreads;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
@@ -214,8 +214,8 @@ public class BlobAnalyzeAction extends ActionType<BlobAnalyzeAction.Response> {
         private final List<DiscoveryNode> earlyReadNodes;
         private final List<DiscoveryNode> readNodes;
         private final GroupedActionListener<NodeResponse> readNodesListener;
-        private final StepListener<WriteDetails> write1Step = new StepListener<>();
-        private final StepListener<WriteDetails> write2Step = new StepListener<>();
+        private final ListenableFuture<WriteDetails> write1Step = new ListenableFuture<>();
+        private final ListenableFuture<WriteDetails> write2Step = new ListenableFuture<>();
         private final CancellableThreads cancellableThreads = new CancellableThreads();
 
         BlobAnalysis(
@@ -253,7 +253,7 @@ public class BlobAnalyzeAction extends ActionType<BlobAnalyzeAction.Response> {
             Collections.shuffle(nodes, random);
             readNodes = nodes.stream().limit(request.readNodeCount).collect(Collectors.toList());
 
-            final StepListener<Collection<NodeResponse>> readsCompleteStep = new StepListener<>();
+            final ListenableFuture<Collection<NodeResponse>> readsCompleteStep = new ListenableFuture<>();
             readNodesListener = new GroupedActionListener<>(
                 earlyReadNodes.size() + readNodes.size(),
                 new ThreadedActionListener<>(transportService.getThreadPool().executor(ThreadPool.Names.SNAPSHOT), readsCompleteStep)
@@ -262,18 +262,24 @@ public class BlobAnalyzeAction extends ActionType<BlobAnalyzeAction.Response> {
             // The order is important in this chain: if writing fails then we may never even start all the reads, and we want to cancel
             // any read tasks that were started, but the reads step only fails after all the reads have completed so there's no need to
             // cancel anything.
-            write1Step.whenComplete(
-                write1Details -> write2Step.whenComplete(
-                    write2Details -> readsCompleteStep.whenComplete(
-                        responses -> onReadsComplete(responses, write1Details, write2Details),
-                        this::cleanUpAndReturnFailure
+            write1Step.addListener(
+                ActionListener.wrap(
+                    write1Details -> write2Step.addListener(
+                        ActionListener.wrap(
+                            write2Details -> readsCompleteStep.addListener(
+                                ActionListener.wrap(
+                                    responses -> onReadsComplete(responses, write1Details, write2Details),
+                                    this::cleanUpAndReturnFailure
+                                )
+                            ),
+                            this::cancelReadsCleanUpAndReturnFailure
+                        )
                     ),
                     this::cancelReadsCleanUpAndReturnFailure
-                ),
-                this::cancelReadsCleanUpAndReturnFailure
+                )
             );
 
-            task.addListener(() -> { cancellableThreads.cancel(task.getReasonCancelled()); });
+            task.addListener(() -> cancellableThreads.cancel(task.getReasonCancelled()));
         }
 
         void run() {
@@ -286,14 +292,21 @@ public class BlobAnalyzeAction extends ActionType<BlobAnalyzeAction.Response> {
 
             if (request.writeAndOverwrite) {
                 assert request.targetLength <= MAX_ATOMIC_WRITE_SIZE : "oversized atomic write";
-                write1Step.whenComplete(ignored -> writeRandomBlob(true, false, this::doReadAfterWrite, write2Step), ignored -> {});
+                write1Step.addListener(
+                    ActionListener.wrap(ignored -> writeRandomBlob(true, false, this::doReadAfterWrite, write2Step), ignored -> {})
+                );
             } else {
                 write2Step.onResponse(null);
                 doReadAfterWrite();
             }
         }
 
-        private void writeRandomBlob(boolean atomic, boolean failIfExists, Runnable onLastRead, StepListener<WriteDetails> stepListener) {
+        private void writeRandomBlob(
+            boolean atomic,
+            boolean failIfExists,
+            Runnable onLastRead,
+            ListenableFuture<WriteDetails> stepListener
+        ) {
             assert atomic == false || request.targetLength <= MAX_ATOMIC_WRITE_SIZE : "oversized atomic write";
             final RandomBlobContent content = new RandomBlobContent(
                 request.getRepositoryName(),