Browse Source

Introduce Delegating ActionListener Wrappers (#40129)

* Introduce Delegating ActionListener Wrappers
* Dry up use cases of ActionListener that simply pass through the response or exception to another listener
Armin Braun 6 years ago
parent
commit
f49436dc25
24 changed files with 206 additions and 432 deletions
  1. 47 0
      server/src/main/java/org/elasticsearch/action/ActionListener.java
  2. 5 14
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java
  3. 3 13
      server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java
  4. 2 13
      server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java
  5. 3 13
      server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java
  6. 5 13
      server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java
  7. 4 14
      server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java
  8. 5 13
      server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java
  9. 3 13
      server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java
  10. 9 33
      server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
  11. 9 17
      server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java
  12. 7 16
      server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java
  13. 7 17
      server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java
  14. 6 13
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  15. 2 12
      server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java
  16. 3 12
      server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java
  17. 2 13
      server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java
  18. 2 12
      server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java
  19. 21 59
      server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
  20. 4 12
      server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java
  21. 26 35
      server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java
  22. 18 35
      test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
  23. 3 12
      test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java
  24. 10 28
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

+ 47 - 0
server/src/main/java/org/elasticsearch/action/ActionListener.java

@@ -72,6 +72,53 @@ public interface ActionListener<Response> {
         };
         };
     }
     }
 
 
+    /**
+     * Creates a listener that delegates all responses it receives to another listener.
+     *
+     * @param delegate ActionListener to wrap and delegate any exception to
+     * @param bc BiConsumer invoked with delegate listener and exception
+     * @param <T> Type of the listener
+     * @return Delegating listener
+     */
+    static <T> ActionListener<T> delegateResponse(ActionListener<T> delegate, BiConsumer<ActionListener<T>, Exception> bc) {
+        return new ActionListener<T>() {
+
+            @Override
+            public void onResponse(T r) {
+                delegate.onResponse(r);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                bc.accept(delegate, e);
+            }
+        };
+    }
+
+    /**
+     * Creates a listener that delegates all exceptions it receives to another listener.
+     *
+     * @param delegate ActionListener to wrap and delegate any exception to
+     * @param bc BiConsumer invoked with delegate listener and response
+     * @param <T> Type of the delegating listener's response
+     * @param <R> Type of the wrapped listeners
+     * @return Delegating listener
+     */
+    static <T, R> ActionListener<T> delegateFailure(ActionListener<R> delegate, BiConsumer<ActionListener<R>, T> bc) {
+        return new ActionListener<T>() {
+
+            @Override
+            public void onResponse(T r) {
+                bc.accept(delegate, r);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                delegate.onFailure(e);
+            }
+        };
+    }
+
     /**
     /**
      * Creates a listener that listens for a response (or failure) and executes the
      * Creates a listener that listens for a response (or failure) and executes the
      * corresponding runnable when the response (or failure) is received.
      * corresponding runnable when the response (or failure) is received.

+ 5 - 14
server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java

@@ -157,7 +157,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
                 // Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads.
                 // Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads.
                 threadPool.generic().execute(new AbstractRunnable() {
                 threadPool.generic().execute(new AbstractRunnable() {
                     @Override
                     @Override
-                    protected void doRun() throws Exception {
+                    protected void doRun() {
                         taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
                         taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
                         waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener);
                         waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener);
                     }
                     }
@@ -180,26 +180,17 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
      */
      */
     void waitedForCompletion(Task thisTask, GetTaskRequest request, TaskInfo snapshotOfRunningTask,
     void waitedForCompletion(Task thisTask, GetTaskRequest request, TaskInfo snapshotOfRunningTask,
             ActionListener<GetTaskResponse> listener) {
             ActionListener<GetTaskResponse> listener) {
-        getFinishedTaskFromIndex(thisTask, request, new ActionListener<GetTaskResponse>() {
-            @Override
-            public void onResponse(GetTaskResponse response) {
-                // We were able to load the task from the task index. Let's send that back.
-                listener.onResponse(response);
-            }
-
-            @Override
-            public void onFailure(Exception e) {
+        getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (delegatedListener, e) -> {
                 /*
                 /*
                  * We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If
                  * We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If
                  * the error isn't a 404 then we'll just throw it back to the user.
                  * the error isn't a 404 then we'll just throw it back to the user.
                  */
                  */
                 if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) {
                 if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) {
-                    listener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
+                    delegatedListener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
                 } else {
                 } else {
-                    listener.onFailure(e);
+                    delegatedListener.onFailure(e);
                 }
                 }
-            }
-        });
+        }));
     }
     }
 
 
     /**
     /**

+ 3 - 13
server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java

@@ -24,7 +24,6 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -69,17 +68,8 @@ public class TransportDeleteRepositoryAction extends TransportMasterNodeAction<D
     protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state,
     protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state,
                                    final ActionListener<AcknowledgedResponse> listener) {
                                    final ActionListener<AcknowledgedResponse> listener) {
         repositoriesService.unregisterRepository(
         repositoriesService.unregisterRepository(
-            request,
-            new ActionListener<ClusterStateUpdateResponse>() {
-                @Override
-                public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) {
-                    listener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()));
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-            });
+            request, ActionListener.delegateFailure(listener,
+                (delegatedListener, unregisterRepositoryResponse) ->
+                    delegatedListener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()))));
     }
     }
 }
 }

+ 2 - 13
server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java

@@ -24,7 +24,6 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -68,17 +67,7 @@ public class TransportPutRepositoryAction extends TransportMasterNodeAction<PutR
     @Override
     @Override
     protected void masterOperation(final PutRepositoryRequest request, ClusterState state,
     protected void masterOperation(final PutRepositoryRequest request, ClusterState state,
                                    final ActionListener<AcknowledgedResponse> listener) {
                                    final ActionListener<AcknowledgedResponse> listener) {
-        repositoriesService.registerRepository(request, new ActionListener<ClusterStateUpdateResponse>() {
-
-            @Override
-            public void onResponse(ClusterStateUpdateResponse response) {
-                listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                listener.onFailure(e);
-            }
-        });
+        repositoriesService.registerRepository(request, ActionListener.delegateFailure(listener,
+            (delegatedListener, response) -> delegatedListener.onResponse(new AcknowledgedResponse(response.isAcknowledged()))));
     }
     }
 }
 }

+ 3 - 13
server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java

@@ -33,8 +33,6 @@ import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 
 
-import java.util.List;
-
 /**
 /**
  * Transport action for verifying repository operation
  * Transport action for verifying repository operation
  */
  */
@@ -70,16 +68,8 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction<V
     @Override
     @Override
     protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state,
     protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state,
                                    final ActionListener<VerifyRepositoryResponse> listener) {
                                    final ActionListener<VerifyRepositoryResponse> listener) {
-        repositoriesService.verifyRepository(request.name(), new ActionListener<List<DiscoveryNode>>() {
-            @Override
-            public void onResponse(List<DiscoveryNode> verifyResponse) {
-                listener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])));
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                listener.onFailure(e);
-            }
-        });
+        repositoriesService.verifyRepository(request.name(), ActionListener.delegateFailure(listener,
+            (delegatedListener, verifyResponse) ->
+                delegatedListener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])))));
     }
     }
 }
 }

+ 5 - 13
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java

@@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.snapshots.RestoreService;
 import org.elasticsearch.snapshots.RestoreService;
-import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 
 
@@ -75,20 +74,13 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
     @Override
     @Override
     protected void masterOperation(final RestoreSnapshotRequest request, final ClusterState state,
     protected void masterOperation(final RestoreSnapshotRequest request, final ClusterState state,
                                    final ActionListener<RestoreSnapshotResponse> listener) {
                                    final ActionListener<RestoreSnapshotResponse> listener) {
-        restoreService.restoreSnapshot(request, new ActionListener<RestoreCompletionResponse>() {
-            @Override
-            public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
+        restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener,
+            (delegatedListener, restoreCompletionResponse) -> {
                 if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
                 if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
-                    RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, listener);
+                    RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener);
                 } else {
                 } else {
-                    listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
+                    delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
                 }
                 }
-            }
-
-            @Override
-            public void onFailure(Exception t) {
-                listener.onFailure(t);
-            }
-        });
+            }));
     }
     }
 }
 }

+ 4 - 14
server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java

@@ -118,19 +118,9 @@ public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIn
             .masterNodeTimeout(request.masterNodeTimeout())
             .masterNodeTimeout(request.masterNodeTimeout())
             .waitForActiveShards(request.waitForActiveShards())
             .waitForActiveShards(request.waitForActiveShards())
             .indices(concreteIndices);
             .indices(concreteIndices);
-
-        indexStateService.closeIndices(closeRequest, new ActionListener<CloseIndexResponse>() {
-
-            @Override
-            public void onResponse(final CloseIndexResponse response) {
-                listener.onResponse(response);
-            }
-
-            @Override
-            public void onFailure(final Exception t) {
-                logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
-                listener.onFailure(t);
-            }
-        });
+        indexStateService.closeIndices(closeRequest, ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
+            logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
+            delegatedListener.onFailure(t);
+        }));
     }
     }
 }
 }

+ 5 - 13
server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java

@@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
 import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Client;
@@ -97,25 +96,18 @@ public class TransportResizeAction extends TransportMasterNodeAction<ResizeReque
         // there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code
         // there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code
         final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
         final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
         final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
         final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
-        client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener<IndicesStatsResponse>() {
-            @Override
-            public void onResponse(IndicesStatsResponse indicesStatsResponse) {
+        client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(
+            ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> {
                 CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
                 CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
-                    (i) -> {
+                    i -> {
                         IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
                         IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
                         return shard == null ? null : shard.getPrimary().getDocs();
                         return shard == null ? null : shard.getPrimary().getDocs();
                     }, sourceIndex, targetIndex);
                     }, sourceIndex, targetIndex);
                 createIndexService.createIndex(
                 createIndexService.createIndex(
-                    updateRequest, ActionListener.map(listener,
+                    updateRequest, ActionListener.map(delegatedListener,
                         response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index()))
                         response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index()))
                 );
                 );
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                listener.onFailure(e);
-            }
-        });
+            }));
 
 
     }
     }
 
 

+ 3 - 13
server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java

@@ -25,7 +25,6 @@ import org.elasticsearch.action.PrimaryMissingActionException;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
 import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -104,7 +103,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
                 versions.put(index, new Tuple<>(version, luceneVersion));
                 versions.put(index, new Tuple<>(version, luceneVersion));
             }
             }
         }
         }
-        Map<String, Tuple<org.elasticsearch.Version, String>> updatedVersions = new HashMap<>();
+        Map<String, Tuple<Version, String>> updatedVersions = new HashMap<>();
         MetaData metaData = clusterState.metaData();
         MetaData metaData = clusterState.metaData();
         for (Map.Entry<String, Tuple<Version, org.apache.lucene.util.Version>> versionEntry : versions.entrySet()) {
         for (Map.Entry<String, Tuple<Version, org.apache.lucene.util.Version>> versionEntry : versions.entrySet()) {
             String index = versionEntry.getKey();
             String index = versionEntry.getKey();
@@ -209,16 +208,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
 
 
     private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
     private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
         UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions());
         UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions());
-        client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener<AcknowledgedResponse>() {
-            @Override
-            public void onResponse(AcknowledgedResponse updateSettingsResponse) {
-                listener.onResponse(upgradeResponse);
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                listener.onFailure(e);
-            }
-        });
+        client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, ActionListener.delegateFailure(
+            listener, (delegatedListener, updateSettingsResponse) -> delegatedListener.onResponse(upgradeResponse)));
     }
     }
 }
 }

+ 9 - 33
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -658,7 +658,15 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                 return ActionListener.map(actionListener,
                 return ActionListener.map(actionListener,
                     response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis));
                     response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis));
             } else {
             } else {
-                return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener);
+                return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> {
+                    BulkItemResponse[] items = response.getItems();
+                    for (int i = 0; i < items.length; i++) {
+                        itemResponses.add(originalSlots[i], response.getItems()[i]);
+                    }
+                    delegatedListener.onResponse(
+                        new BulkResponse(
+                            itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis));
+                });
             }
             }
         }
         }
 
 
@@ -688,36 +696,4 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         }
         }
 
 
     }
     }
-
-    static final class IngestBulkResponseListener implements ActionListener<BulkResponse> {
-
-        private final long ingestTookInMillis;
-        private final int[] originalSlots;
-        private final List<BulkItemResponse> itemResponses;
-        private final ActionListener<BulkResponse> actionListener;
-
-        IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses,
-                                   ActionListener<BulkResponse> actionListener) {
-            this.ingestTookInMillis = ingestTookInMillis;
-            this.itemResponses = itemResponses;
-            this.actionListener = actionListener;
-            this.originalSlots = originalSlots;
-        }
-
-        @Override
-        public void onResponse(BulkResponse response) {
-            BulkItemResponse[] items = response.getItems();
-            for (int i = 0; i < items.length; i++) {
-                itemResponses.add(originalSlots[i], response.getItems()[i]);
-            }
-            actionListener.onResponse(new BulkResponse(
-                    itemResponses.toArray(new BulkItemResponse[itemResponses.size()]),
-                    response.getTook().getMillis(), ingestTookInMillis));
-        }
-
-        @Override
-        public void onFailure(Exception e) {
-            actionListener.onFailure(e);
-        }
-    }
 }
 }

+ 9 - 17
server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

@@ -31,6 +31,7 @@ import org.elasticsearch.cluster.ClusterStateObserver;
 import org.elasticsearch.cluster.MasterNodeChangePredicate;
 import org.elasticsearch.cluster.MasterNodeChangePredicate;
 import org.elasticsearch.cluster.NotMasterException;
 import org.elasticsearch.cluster.NotMasterException;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -39,7 +40,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.Streamable;
 import org.elasticsearch.common.io.stream.Streamable;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
 import org.elasticsearch.discovery.MasterNotDiscoveredException;
 import org.elasticsearch.discovery.MasterNotDiscoveredException;
 import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.Task;
@@ -185,23 +185,15 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
                             });
                             });
                         }
                         }
                     } else {
                     } else {
-                        ActionListener<Response> delegate = new ActionListener<Response>() {
-                            @Override
-                            public void onResponse(Response response) {
-                                listener.onResponse(response);
-                            }
-
-                            @Override
-                            public void onFailure(Exception t) {
-                                if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
-                                    logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " +
-                                        "stepped down before publishing action [{}], scheduling a retry", actionName), t);
-                                    retry(t, masterChangePredicate);
-                                } else {
-                                    listener.onFailure(t);
-                                }
+                        ActionListener<Response> delegate = ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
+                            if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
+                                logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " +
+                                    "stepped down before publishing action [{}], scheduling a retry", actionName), t);
+                                retry(t, masterChangePredicate);
+                            } else {
+                                delegatedListener.onFailure(t);
                             }
                             }
-                        };
+                        });
                         threadPool.executor(executor).execute(new ActionRunnable<Response>(delegate) {
                         threadPool.executor(executor).execute(new ActionRunnable<Response>(delegate) {
                             @Override
                             @Override
                             protected void doRun() throws Exception {
                             protected void doRun() throws Exception {

+ 7 - 16
server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java

@@ -27,7 +27,6 @@ import org.apache.lucene.util.BytesRefBuilder;
 import org.apache.lucene.util.SetOnce;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.ParsingException;
 import org.elasticsearch.common.ParsingException;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Strings;
@@ -464,22 +463,14 @@ public class TermsQueryBuilder extends AbstractQueryBuilder<TermsQueryBuilder> {
             ? new GetRequest(termsLookup.index(), termsLookup.id())
             ? new GetRequest(termsLookup.index(), termsLookup.id())
             : new GetRequest(termsLookup.index(), termsLookup.type(), termsLookup.id());
             : new GetRequest(termsLookup.index(), termsLookup.type(), termsLookup.id());
         getRequest.preference("_local").routing(termsLookup.routing());
         getRequest.preference("_local").routing(termsLookup.routing());
-        client.get(getRequest, new ActionListener<GetResponse>() {
-            @Override
-            public void onResponse(GetResponse getResponse) {
-                List<Object> terms = new ArrayList<>();
-                if (getResponse.isSourceEmpty() == false) { // extract terms only if the doc source exists
-                    List<Object> extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap());
-                    terms.addAll(extractedValues);
-                }
-                actionListener.onResponse(terms);
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                actionListener.onFailure(e);
+        client.get(getRequest, ActionListener.delegateFailure(actionListener, (delegatedListener, getResponse) -> {
+            List<Object> terms = new ArrayList<>();
+            if (getResponse.isSourceEmpty() == false) { // extract terms only if the doc source exists
+                List<Object> extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap());
+                terms.addAll(extractedValues);
             }
             }
-        });
+            delegatedListener.onResponse(terms);
+        }));
     }
     }
 
 
     @Override
     @Override

+ 7 - 17
server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java

@@ -102,23 +102,13 @@ public class RetentionLeaseActions {
             final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
             final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
             final IndexShard indexShard = indexService.getShard(shardId.id());
             final IndexShard indexShard = indexService.getShard(shardId.id());
             indexShard.acquirePrimaryOperationPermit(
             indexShard.acquirePrimaryOperationPermit(
-                    new ActionListener<Releasable>() {
-
-                        @Override
-                        public void onResponse(final Releasable releasable) {
-                            try (Releasable ignore = releasable) {
-                                doRetentionLeaseAction(indexShard, request, listener);
-                            }
-                        }
-
-                        @Override
-                        public void onFailure(final Exception e) {
-                            listener.onFailure(e);
-                        }
-
-                    },
-                    ThreadPool.Names.SAME,
-                    request);
+                ActionListener.delegateFailure(listener, (delegatedListener, releasable) -> {
+                    try (Releasable ignore = releasable) {
+                        doRetentionLeaseAction(indexShard, request, delegatedListener);
+                    }
+                }),
+                ThreadPool.Names.SAME,
+                request);
         }
         }
 
 
         @Override
         @Override

+ 6 - 13
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -2688,9 +2688,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         // primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed
         // primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed
         // in the order submitted, combining both operations ensure that the term is updated before the operation is
         // in the order submitted, combining both operations ensure that the term is updated before the operation is
         // executed. It also has the side effect of acquiring all the permits one time instead of two.
         // executed. It also has the side effect of acquiring all the permits one time instead of two.
-        final ActionListener<Releasable> operationListener = new ActionListener<Releasable>() {
-            @Override
-            public void onResponse(final Releasable releasable) {
+        final ActionListener<Releasable> operationListener = ActionListener.delegateFailure(onPermitAcquired,
+            (delegatedListener, releasable) -> {
                 if (opPrimaryTerm < getOperationPrimaryTerm()) {
                 if (opPrimaryTerm < getOperationPrimaryTerm()) {
                     releasable.close();
                     releasable.close();
                     final String message = String.format(
                     final String message = String.format(
@@ -2699,7 +2698,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                         shardId,
                         shardId,
                         opPrimaryTerm,
                         opPrimaryTerm,
                         getOperationPrimaryTerm());
                         getOperationPrimaryTerm());
-                    onPermitAcquired.onFailure(new IllegalStateException(message));
+                    delegatedListener.onFailure(new IllegalStateException(message));
                 } else {
                 } else {
                     assert assertReplicationTarget();
                     assert assertReplicationTarget();
                     try {
                     try {
@@ -2707,18 +2706,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                         advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
                         advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
                     } catch (Exception e) {
                     } catch (Exception e) {
                         releasable.close();
                         releasable.close();
-                        onPermitAcquired.onFailure(e);
+                        delegatedListener.onFailure(e);
                         return;
                         return;
                     }
                     }
-                    onPermitAcquired.onResponse(releasable);
+                    delegatedListener.onResponse(releasable);
                 }
                 }
-            }
-
-            @Override
-            public void onFailure(final Exception e) {
-                onPermitAcquired.onFailure(e);
-            }
-        };
+            });
 
 
         if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
         if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
             synchronized (mutex) {
             synchronized (mutex) {

+ 2 - 12
server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java

@@ -34,7 +34,6 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 
 
@@ -165,17 +164,8 @@ public class CompletionPersistentTaskAction extends Action<PersistentTaskRespons
         protected final void masterOperation(final Request request, ClusterState state,
         protected final void masterOperation(final Request request, ClusterState state,
                                              final ActionListener<PersistentTaskResponse> listener) {
                                              final ActionListener<PersistentTaskResponse> listener) {
             persistentTasksClusterService.completePersistentTask(request.taskId, request.allocationId, request.exception,
             persistentTasksClusterService.completePersistentTask(request.taskId, request.allocationId, request.exception,
-                    new ActionListener<PersistentTask<?>>() {
-                        @Override
-                        public void onResponse(PersistentTask<?> task) {
-                            listener.onResponse(new PersistentTaskResponse(task));
-                        }
-
-                        @Override
-                        public void onFailure(Exception e) {
-                            listener.onFailure(e);
-                        }
-                    });
+                ActionListener.delegateFailure(listener,
+                    (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
         }
         }
     }
     }
 }
 }

+ 3 - 12
server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java

@@ -34,7 +34,6 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 
 
@@ -149,17 +148,9 @@ public class RemovePersistentTaskAction extends Action<PersistentTaskResponse> {
         @Override
         @Override
         protected final void masterOperation(final Request request, ClusterState state,
         protected final void masterOperation(final Request request, ClusterState state,
                                              final ActionListener<PersistentTaskResponse> listener) {
                                              final ActionListener<PersistentTaskResponse> listener) {
-            persistentTasksClusterService.removePersistentTask(request.taskId, new ActionListener<PersistentTask<?>>() {
-                @Override
-                public void onResponse(PersistentTask<?> task) {
-                    listener.onResponse(new PersistentTaskResponse(task));
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-            });
+            persistentTasksClusterService.removePersistentTask(
+                request.taskId, ActionListener.delegateFailure(listener,
+                    (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
         }
         }
     }
     }
 }
 }

+ 2 - 13
server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java

@@ -35,7 +35,6 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 
 
@@ -217,18 +216,8 @@ public class StartPersistentTaskAction extends Action<PersistentTaskResponse> {
         protected final void masterOperation(final Request request, ClusterState state,
         protected final void masterOperation(final Request request, ClusterState state,
                                              final ActionListener<PersistentTaskResponse> listener) {
                                              final ActionListener<PersistentTaskResponse> listener) {
             persistentTasksClusterService.createPersistentTask(request.taskId, request.taskName, request.params,
             persistentTasksClusterService.createPersistentTask(request.taskId, request.taskName, request.params,
-                    new ActionListener<PersistentTask<?>>() {
-
-                @Override
-                public void onResponse(PersistentTask<?> task) {
-                    listener.onResponse(new PersistentTaskResponse(task));
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-            });
+                ActionListener.delegateFailure(listener,
+                    (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
         }
         }
     }
     }
 }
 }

+ 2 - 12
server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java

@@ -34,7 +34,6 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 
 
@@ -181,17 +180,8 @@ public class UpdatePersistentTaskStatusAction extends Action<PersistentTaskRespo
                                              final ClusterState state,
                                              final ClusterState state,
                                              final ActionListener<PersistentTaskResponse> listener) {
                                              final ActionListener<PersistentTaskResponse> listener) {
             persistentTasksClusterService.updatePersistentTaskState(request.taskId, request.allocationId, request.state,
             persistentTasksClusterService.updatePersistentTaskState(request.taskId, request.allocationId, request.state,
-                    new ActionListener<PersistentTask<?>>() {
-                @Override
-                public void onResponse(PersistentTask<?> task) {
-                    listener.onResponse(new PersistentTaskResponse(task));
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-            });
+                ActionListener.delegateFailure(listener,
+                    (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
         }
         }
     }
     }
 }
 }

+ 21 - 59
server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

@@ -97,7 +97,15 @@ public class RepositoriesService implements ClusterStateApplier {
 
 
         final ActionListener<ClusterStateUpdateResponse> registrationListener;
         final ActionListener<ClusterStateUpdateResponse> registrationListener;
         if (request.verify()) {
         if (request.verify()) {
-            registrationListener = new VerifyingRegisterRepositoryListener(request.name(), listener);
+            registrationListener = ActionListener.delegateFailure(listener, (delegatedListener, clusterStateUpdateResponse) -> {
+                if (clusterStateUpdateResponse.isAcknowledged()) {
+                    // The response was acknowledged - all nodes should know about the new repository, let's verify them
+                    verifyRepository(request.name(), ActionListener.delegateFailure(delegatedListener,
+                        (innerDelegatedListener, discoveryNodes) -> innerDelegatedListener.onResponse(clusterStateUpdateResponse)));
+                } else {
+                    delegatedListener.onResponse(clusterStateUpdateResponse);
+                }
+            });
         } else {
         } else {
             registrationListener = listener;
             registrationListener = listener;
         }
         }
@@ -229,27 +237,18 @@ public class RepositoriesService implements ClusterStateApplier {
                     final String verificationToken = repository.startVerification();
                     final String verificationToken = repository.startVerification();
                     if (verificationToken != null) {
                     if (verificationToken != null) {
                         try {
                         try {
-                            verifyAction.verify(repositoryName, verificationToken, new ActionListener<List<DiscoveryNode>>() {
-                                @Override
-                                public void onResponse(List<DiscoveryNode> verifyResponse) {
-                                    threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
-                                        try {
-                                            repository.endVerification(verificationToken);
-                                        } catch (Exception e) {
-                                            logger.warn(() -> new ParameterizedMessage(
-                                                "[{}] failed to finish repository verification", repositoryName), e);
-                                            listener.onFailure(e);
-                                            return;
-                                        }
-                                        listener.onResponse(verifyResponse);
-                                    });
-                                }
-
-                                @Override
-                                public void onFailure(Exception e) {
-                                    listener.onFailure(e);
-                                }
-                            });
+                            verifyAction.verify(repositoryName, verificationToken, ActionListener.delegateFailure(listener,
+                                (delegatedListener, verifyResponse) -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
+                                    try {
+                                        repository.endVerification(verificationToken);
+                                    } catch (Exception e) {
+                                        logger.warn(() -> new ParameterizedMessage(
+                                            "[{}] failed to finish repository verification", repositoryName), e);
+                                        delegatedListener.onFailure(e);
+                                        return;
+                                    }
+                                    delegatedListener.onResponse(verifyResponse);
+                                })));
                         } catch (Exception e) {
                         } catch (Exception e) {
                             threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
                             threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
                                 try {
                                 try {
@@ -424,41 +423,4 @@ public class RepositoriesService implements ClusterStateApplier {
             throw new IllegalStateException("trying to modify or unregister repository that is currently used ");
             throw new IllegalStateException("trying to modify or unregister repository that is currently used ");
         }
         }
     }
     }
-
-    private class VerifyingRegisterRepositoryListener implements ActionListener<ClusterStateUpdateResponse> {
-
-        private final String name;
-
-        private final ActionListener<ClusterStateUpdateResponse> listener;
-
-        VerifyingRegisterRepositoryListener(String name, final ActionListener<ClusterStateUpdateResponse> listener) {
-            this.name = name;
-            this.listener = listener;
-        }
-
-        @Override
-        public void onResponse(final ClusterStateUpdateResponse clusterStateUpdateResponse) {
-            if (clusterStateUpdateResponse.isAcknowledged()) {
-                // The response was acknowledged - all nodes should know about the new repository, let's verify them
-                verifyRepository(name, new ActionListener<List<DiscoveryNode>>() {
-                    @Override
-                    public void onResponse(List<DiscoveryNode> verifyResponse) {
-                        listener.onResponse(clusterStateUpdateResponse);
-                    }
-
-                    @Override
-                    public void onFailure(Exception e) {
-                        listener.onFailure(e);
-                    }
-                });
-            } else {
-                listener.onResponse(clusterStateUpdateResponse);
-            }
-        }
-
-        @Override
-        public void onFailure(Exception e) {
-            listener.onFailure(e);
-        }
-    }
 }
 }

+ 4 - 12
server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java

@@ -363,19 +363,11 @@ public class TransportClientNodesServiceTests extends ESTestCase {
                 final List<Transport.Connection> establishedConnections = new CopyOnWriteArrayList<>();
                 final List<Transport.Connection> establishedConnections = new CopyOnWriteArrayList<>();
 
 
                 clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile, listener) ->
                 clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile, listener) ->
-                    transport.openConnection(discoveryNode, profile, new ActionListener<Transport.Connection>() {
-                        @Override
-                        public void onResponse(Transport.Connection connection) {
+                    transport.openConnection(discoveryNode, profile,
+                        ActionListener.delegateFailure(listener, (delegatedListener, connection) -> {
                             establishedConnections.add(connection);
                             establishedConnections.add(connection);
-                            listener.onResponse(connection);
-                        }
-
-                        @Override
-                        public void onFailure(Exception e) {
-                            listener.onFailure(e);
-                        }
-                    }));
-
+                            delegatedListener.onResponse(connection);
+                        })));
 
 
                 clientService.start();
                 clientService.start();
                 clientService.acceptIncomingRequests();
                 clientService.acceptIncomingRequests();

+ 26 - 35
server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

@@ -1263,45 +1263,36 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                     // route by seed hostname
                     // route by seed hostname
                     proxyNode = proxyMapping.get(node.getHostName());
                     proxyNode = proxyMapping.get(node.getHostName());
                 }
                 }
-                return t.openConnection(proxyNode, profile, new ActionListener<Transport.Connection>() {
-                    @Override
-                    public void onResponse(Transport.Connection connection) {
-                        Transport.Connection proxyConnection = new Transport.Connection() {
-                            @Override
-                            public DiscoveryNode getNode() {
-                                return node;
-                            }
-
-                            @Override
-                            public void sendRequest(long requestId, String action, TransportRequest request,
-                                                    TransportRequestOptions options) throws IOException, TransportException {
-                                connection.sendRequest(requestId, action, request, options);
-                            }
+            return t.openConnection(proxyNode, profile, ActionListener.delegateFailure(listener,
+                (delegatedListener, connection) -> delegatedListener.onResponse(
+                    new Transport.Connection() {
+                        @Override
+                        public DiscoveryNode getNode() {
+                            return node;
+                        }
 
 
-                            @Override
-                            public void addCloseListener(ActionListener<Void> listener) {
-                                connection.addCloseListener(listener);
-                            }
+                        @Override
+                        public void sendRequest(long requestId, String action, TransportRequest request,
+                                                TransportRequestOptions options) throws IOException {
+                            connection.sendRequest(requestId, action, request, options);
+                        }
 
 
-                            @Override
-                            public boolean isClosed() {
-                                return connection.isClosed();
-                            }
+                        @Override
+                        public void addCloseListener(ActionListener<Void> listener) {
+                            connection.addCloseListener(listener);
+                        }
 
 
-                            @Override
-                            public void close() {
-                                connection.close();
-                            }
-                        };
-                        listener.onResponse(proxyConnection);
-                    }
+                        @Override
+                        public boolean isClosed() {
+                            return connection.isClosed();
+                        }
 
 
-                    @Override
-                    public void onFailure(Exception e) {
-                        listener.onFailure(e);
-                    }
-                });
-            });
+                        @Override
+                        public void close() {
+                            connection.close();
+                        }
+                    })));
+        });
         return stubbableTransport;
         return stubbableTransport;
     }
     }
 }
 }

+ 18 - 35
test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -433,7 +433,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
         }
         }
 
 
         public Future<Void> asyncRecoverReplica(
         public Future<Void> asyncRecoverReplica(
-                final IndexShard replica, final BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier) throws IOException {
+                final IndexShard replica, final BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier) {
             final FutureTask<Void> task = new FutureTask<>(() -> {
             final FutureTask<Void> task = new FutureTask<>(() -> {
                 recoverReplica(replica, targetSupplier);
                 recoverReplica(replica, targetSupplier);
                 return null;
                 return null;
@@ -609,17 +609,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
         public void execute() {
         public void execute() {
             try {
             try {
                 new ReplicationOperation<>(request, new PrimaryRef(),
                 new ReplicationOperation<>(request, new PrimaryRef(),
-                    new ActionListener<PrimaryResult>() {
-                        @Override
-                        public void onResponse(PrimaryResult result) {
-                            result.respond(listener);
-                        }
-
-                        @Override
-                        public void onFailure(Exception e) {
-                            listener.onFailure(e);
-                        }
-                    }, new ReplicasRef(), logger, opType).execute();
+                    ActionListener.delegateFailure(listener,
+                        (delegatedListener, result) -> result.respond(delegatedListener)), new ReplicasRef(), logger, opType).execute();
             } catch (Exception e) {
             } catch (Exception e) {
                 listener.onFailure(e);
                 listener.onFailure(e);
             }
             }
@@ -693,28 +684,20 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
                 final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
                 final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
                 IndexShard replica = replicationTargets.findReplicaShard(replicaRouting);
                 IndexShard replica = replicationTargets.findReplicaShard(replicaRouting);
                 replica.acquireReplicaOperationPermit(
                 replica.acquireReplicaOperationPermit(
-                        getPrimaryShard().getPendingPrimaryTerm(),
-                        globalCheckpoint,
-                        maxSeqNoOfUpdatesOrDeletes,
-                        new ActionListener<Releasable>() {
-                            @Override
-                            public void onResponse(Releasable releasable) {
-                                try {
-                                    performOnReplica(request, replica);
-                                    releasable.close();
-                                    listener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint()));
-                                } catch (final Exception e) {
-                                    Releasables.closeWhileHandlingException(releasable);
-                                    listener.onFailure(e);
-                                }
-                            }
-
-                            @Override
-                            public void onFailure(Exception e) {
-                                listener.onFailure(e);
-                            }
-                        },
-                        ThreadPool.Names.WRITE, request);
+                    getPrimaryShard().getPendingPrimaryTerm(),
+                    globalCheckpoint,
+                    maxSeqNoOfUpdatesOrDeletes,
+                    ActionListener.delegateFailure(listener, (delegatedListener, releasable) -> {
+                        try {
+                            performOnReplica(request, replica);
+                            releasable.close();
+                            delegatedListener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint()));
+                        } catch (final Exception e) {
+                            Releasables.closeWhileHandlingException(releasable);
+                            delegatedListener.onFailure(e);
+                        }
+                    }),
+                    ThreadPool.Names.WRITE, request);
             }
             }
 
 
             @Override
             @Override
@@ -893,7 +876,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
     }
     }
 
 
     private TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> executeResyncOnPrimary(
     private TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> executeResyncOnPrimary(
-        IndexShard primary, ResyncReplicationRequest request) throws Exception {
+        IndexShard primary, ResyncReplicationRequest request) {
         final TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> result =
         final TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> result =
             new TransportWriteAction.WritePrimaryResult<>(TransportResyncReplicationAction.performOnPrimary(request),
             new TransportWriteAction.WritePrimaryResult<>(TransportResyncReplicationAction.performOnPrimary(request),
                 new ResyncReplicationResponse(), null, null, primary, logger);
                 new ResyncReplicationResponse(), null, null, primary, logger);

+ 3 - 12
test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java

@@ -129,18 +129,9 @@ public final class StubbableTransport implements Transport {
         TransportAddress address = node.getAddress();
         TransportAddress address = node.getAddress();
         OpenConnectionBehavior behavior = connectBehaviors.getOrDefault(address, defaultConnectBehavior);
         OpenConnectionBehavior behavior = connectBehaviors.getOrDefault(address, defaultConnectBehavior);
 
 
-        ActionListener<Connection> wrappedListener = new ActionListener<Connection>() {
-
-            @Override
-            public void onResponse(Connection connection) {
-                listener.onResponse(new WrappedConnection(connection));
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                listener.onFailure(e);
-            }
-        };
+        ActionListener<Connection> wrappedListener =
+            ActionListener.delegateFailure(listener,
+                (delegatedListener, connection) -> delegatedListener.onResponse(new WrappedConnection(connection)));
 
 
         if (behavior == null) {
         if (behavior == null) {
             return delegate.openConnection(node, profile, wrappedListener);
             return delegate.openConnection(node, profile, wrappedListener);

+ 10 - 28
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

@@ -12,7 +12,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
-import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.ActiveShardsObserver;
 import org.elasticsearch.action.support.ActiveShardsObserver;
@@ -147,19 +146,10 @@ public final class TransportPutFollowAction
             }
             }
 
 
             @Override
             @Override
-            protected void doRun() throws Exception {
-                restoreService.restoreSnapshot(restoreRequest, new ActionListener<RestoreService.RestoreCompletionResponse>() {
-
-                    @Override
-                    public void onResponse(RestoreService.RestoreCompletionResponse response) {
-                        afterRestoreStarted(clientWithHeaders, request, listener, response);
-                    }
-
-                    @Override
-                    public void onFailure(Exception e) {
-                        listener.onFailure(e);
-                    }
-                });
+            protected void doRun() {
+                restoreService.restoreSnapshot(restoreRequest,
+                    ActionListener.delegateFailure(listener,
+                        (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response)));
             }
             }
         });
         });
     }
     }
@@ -186,28 +176,20 @@ public final class TransportPutFollowAction
             listener = originalListener;
             listener = originalListener;
         }
         }
 
 
-        RestoreClusterStateListener.createAndRegisterListener(clusterService, response, new ActionListener<RestoreSnapshotResponse>() {
-            @Override
-            public void onResponse(RestoreSnapshotResponse restoreSnapshotResponse) {
+        RestoreClusterStateListener.createAndRegisterListener(clusterService, response,
+            ActionListener.delegateFailure(listener, (delegatedListener, restoreSnapshotResponse) -> {
                 RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
                 RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
-
                 if (restoreInfo == null) {
                 if (restoreInfo == null) {
                     // If restoreInfo is null then it is possible there was a master failure during the
                     // If restoreInfo is null then it is possible there was a master failure during the
                     // restore.
                     // restore.
-                    listener.onResponse(new PutFollowAction.Response(true, false, false));
+                    delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
                 } else if (restoreInfo.failedShards() == 0) {
                 } else if (restoreInfo.failedShards() == 0) {
-                    initiateFollowing(clientWithHeaders, request, listener);
+                    initiateFollowing(clientWithHeaders, request, delegatedListener);
                 } else {
                 } else {
                     assert restoreInfo.failedShards() > 0 : "Should have failed shards";
                     assert restoreInfo.failedShards() > 0 : "Should have failed shards";
-                    listener.onResponse(new PutFollowAction.Response(true, false, false));
+                    delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
                 }
                 }
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                listener.onFailure(e);
-            }
-        });
+            }));
     }
     }
 
 
     private void initiateFollowing(
     private void initiateFollowing(