Browse Source

Cleanup Various Action- Listener and Runnable Usages (#42273)

* Dry up code for creating simple `ActionRunnable` a little
* Shorten some other code around `ActionListener` usage, in particular
when wrapping it in a `TransportResponseListener`
Armin Braun 6 years ago
parent
commit
192845be8d
18 changed files with 153 additions and 308 deletions
  1. 18 0
      server/src/main/java/org/elasticsearch/action/ActionRunnable.java
  2. 2 24
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java
  3. 4 10
      server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
  4. 3 6
      server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java
  5. 3 7
      server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java
  6. 2 6
      server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java
  7. 2 6
      server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java
  8. 1 6
      server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
  9. 2 12
      server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
  10. 3 9
      server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java
  11. 7 10
      server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java
  12. 41 60
      server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java
  13. 3 23
      server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java
  14. 19 56
      server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
  15. 30 34
      server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
  16. 4 15
      server/src/main/java/org/elasticsearch/search/SearchService.java
  17. 8 11
      server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
  18. 1 13
      server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java

+ 18 - 0
server/src/main/java/org/elasticsearch/action/ActionRunnable.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action;
 
+import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 
 /**
@@ -29,6 +30,23 @@ public abstract class ActionRunnable<Response> extends AbstractRunnable {
 
     protected final ActionListener<Response> listener;
 
+    /**
+     * Creates a {@link Runnable} that wraps the given listener and a consumer of it that is executed when the {@link Runnable} is run.
+     * Invokes {@link ActionListener#onFailure(Exception)} on it if an exception is thrown on executing the consumer.
+     * @param listener ActionListener to wrap
+     * @param consumer Consumer of wrapped {@code ActionListener}
+     * @param <T> Type of the given {@code ActionListener}
+     * @return Wrapped {@code Runnable}
+     */
+    public static <T> ActionRunnable<T> wrap(ActionListener<T> listener, CheckedConsumer<ActionListener<T>, Exception> consumer) {
+        return new ActionRunnable<>(listener) {
+            @Override
+            protected void doRun() throws Exception {
+                consumer.accept(listener);
+            }
+        };
+    }
+
     public ActionRunnable(ActionListener<Response> listener) {
         this.listener = listener;
     }

+ 2 - 24
server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java

@@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.support.ActionFilters;
@@ -32,7 +33,6 @@ import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -45,9 +45,7 @@ import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.tasks.TaskResult;
 import org.elasticsearch.tasks.TaskResultsService;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequestOptions;
-import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
@@ -118,27 +116,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
         }
         GetTaskRequest nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId());
         transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(),
-                new TransportResponseHandler<GetTaskResponse>() {
-                    @Override
-                    public GetTaskResponse read(StreamInput in) throws IOException {
-                        return new GetTaskResponse(in);
-                    }
-
-                    @Override
-                    public void handleResponse(GetTaskResponse response) {
-                        listener.onResponse(response);
-                    }
-
-                    @Override
-                    public void handleException(TransportException exp) {
-                        listener.onFailure(exp);
-                    }
-
-                    @Override
-                    public String executor() {
-                        return ThreadPool.Names.SAME;
-                    }
-                });
+            new ActionListenerResponseHandler<>(listener, GetTaskResponse::new, ThreadPool.Names.SAME));
     }
 
     /**

+ 4 - 10
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -26,6 +26,7 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.RoutingMissingException;
@@ -56,7 +57,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -327,10 +327,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
      * retries on retryable cluster blocks, resolves item requests,
      * constructs shard bulk requests and delegates execution to shard bulk action
      * */
-    private final class BulkOperation extends AbstractRunnable {
+    private final class BulkOperation extends ActionRunnable<BulkResponse> {
         private final Task task;
         private final BulkRequest bulkRequest;
-        private final ActionListener<BulkResponse> listener;
         private final AtomicArray<BulkItemResponse> responses;
         private final long startTimeNanos;
         private final ClusterStateObserver observer;
@@ -338,9 +337,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
 
         BulkOperation(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener, AtomicArray<BulkItemResponse> responses,
                 long startTimeNanos, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
+            super(listener);
             this.task = task;
             this.bulkRequest = bulkRequest;
-            this.listener = listener;
             this.responses = responses;
             this.startTimeNanos = startTimeNanos;
             this.indicesThatCannotBeCreated = indicesThatCannotBeCreated;
@@ -348,12 +347,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         }
 
         @Override
-        public void onFailure(Exception e) {
-            listener.onFailure(e);
-        }
-
-        @Override
-        protected void doRun() throws Exception {
+        protected void doRun() {
             final ClusterState clusterState = observer.setAndGetObservedState();
             if (handleBlockExceptions(clusterState)) {
                 return;

+ 3 - 6
server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

@@ -64,9 +64,7 @@ class SimulateExecutionService {
     }
 
     public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {
-        threadPool.executor(THREAD_POOL_NAME).execute(new ActionRunnable<SimulatePipelineResponse>(listener) {
-            @Override
-            protected void doRun() throws Exception {
+        threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
                 List<SimulateDocumentResult> responses = new ArrayList<>();
                 for (IngestDocument ingestDocument : request.getDocuments()) {
                     SimulateDocumentResult response = executeDocument(request.getPipeline(), ingestDocument, request.isVerbose());
@@ -74,8 +72,7 @@ class SimulateExecutionService {
                         responses.add(response);
                     }
                 }
-                listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
-            }
-        });
+                l.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
+        }));
     }
 }

+ 3 - 7
server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

@@ -298,12 +298,8 @@ public abstract class TransportBroadcastAction<
         }
     }
 
-    protected void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
-        transportService.getThreadPool().executor(shardExecutor).execute(new ActionRunnable<ShardResponse>(listener) {
-            @Override
-            protected void doRun() throws Exception {
-                listener.onResponse(shardOperation(request, task));
-            }
-        });
+    private void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
+        transportService.getThreadPool().executor(shardExecutor)
+            .execute(ActionRunnable.wrap(listener, l -> l.onResponse(shardOperation(request, task))));
     }
 }

+ 2 - 6
server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

@@ -155,12 +155,8 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
                                 delegatedListener.onFailure(t);
                             }
                         });
-                        threadPool.executor(executor).execute(new ActionRunnable<Response>(delegate) {
-                            @Override
-                            protected void doRun() throws Exception {
-                                masterOperation(task, request, clusterState, delegate);
-                            }
-                        });
+                        threadPool.executor(executor)
+                            .execute(ActionRunnable.wrap(delegate, l -> masterOperation(task, request, clusterState, l)));
                     }
                 } else {
                     if (nodes.getMasterNode() == null) {

+ 2 - 6
server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java

@@ -106,12 +106,8 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
     protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;
 
     protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
-        threadPool.executor(getExecutor(request, shardId)).execute(new ActionRunnable<>(listener) {
-            @Override
-            protected void doRun() throws Exception {
-                listener.onResponse(shardOperation(request, shardId));
-            }
-        });
+        threadPool.executor(getExecutor(request, shardId))
+            .execute(ActionRunnable.wrap(listener, l -> l.onResponse((shardOperation(request, shardId)))));
     }
 
     protected abstract Writeable.Reader<Response> getResponseReader();

+ 1 - 6
server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java

@@ -254,12 +254,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
             if (retryCount < request.retryOnConflict()) {
                 logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]",
                         retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id());
-                threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
-                    @Override
-                    protected void doRun() {
-                        shardOperation(request, listener, retryCount + 1);
-                    }
-                });
+                threadPool.executor(executor()).execute(ActionRunnable.wrap(listener, l -> shardOperation(request, l, retryCount + 1)));
                 return;
             }
         }

+ 2 - 12
server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

@@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
@@ -41,7 +42,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool.Names;
-import org.elasticsearch.transport.EmptyTransportResponseHandler;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequest;
@@ -296,17 +296,7 @@ public class JoinHelper {
         transportService.sendRequest(node, VALIDATE_JOIN_ACTION_NAME,
             new ValidateJoinRequest(state),
             TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
-            new EmptyTransportResponseHandler(ThreadPool.Names.GENERIC) {
-                @Override
-                public void handleResponse(TransportResponse.Empty response) {
-                    listener.onResponse(response);
-                }
-
-                @Override
-                public void handleException(TransportException exp) {
-                    listener.onFailure(exp);
-                }
-            });
+            new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC));
     }
 
     public interface JoinCallback {

+ 3 - 9
server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

@@ -26,6 +26,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.NotifyOnceListener;
 import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
@@ -59,7 +60,6 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.CountDown;
@@ -296,26 +296,20 @@ public class MetaDataIndexStateService {
      * this action succeed then the shard is considered to be ready for closing. When all shards of a given index are ready for closing,
      * the index is considered ready to be closed.
      */
-    class WaitForClosedBlocksApplied extends AbstractRunnable {
+    class WaitForClosedBlocksApplied extends ActionRunnable<Map<Index, IndexResult>> {
 
         private final Map<Index, ClusterBlock> blockedIndices;
         private final CloseIndexClusterStateUpdateRequest request;
-        private final ActionListener<Map<Index, IndexResult>> listener;
 
         private WaitForClosedBlocksApplied(final Map<Index, ClusterBlock> blockedIndices,
                                            final CloseIndexClusterStateUpdateRequest request,
                                            final ActionListener<Map<Index, IndexResult>> listener) {
+                super(listener);
             if (blockedIndices == null || blockedIndices.isEmpty()) {
                 throw new IllegalArgumentException("Cannot wait for closed blocks to be applied, list of blocked indices is empty or null");
             }
             this.blockedIndices = blockedIndices;
             this.request = request;
-            this.listener = listener;
-        }
-
-        @Override
-        public void onFailure(final Exception e) {
-            listener.onFailure(e);
         }
 
         @Override

+ 7 - 10
server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.common.util.concurrent;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.common.collect.Tuple;
 
@@ -89,17 +90,13 @@ public final class ListenableFuture<V> extends BaseFuture<V> implements ActionLi
 
     private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
         try {
-            executorService.execute(new Runnable() {
+            executorService.execute(new ActionRunnable<>(listener) {
                 @Override
-                public void run() {
-                    try {
-                        // call get in a non-blocking fashion as we could be on a network thread
-                        // or another thread like the scheduler, which we should never block!
-                        V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS);
-                        listener.onResponse(value);
-                    } catch (Exception e) {
-                        listener.onFailure(e);
-                    }
+                protected void doRun() {
+                    // call get in a non-blocking fashion as we could be on a network thread
+                    // or another thread like the scheduler, which we should never block!
+                    V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS);
+                    listener.onResponse(value);
                 }
 
                 @Override

+ 41 - 60
server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java

@@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.NotifyOnceListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Randomness;
@@ -37,7 +38,6 @@ import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector;
 import org.elasticsearch.transport.ConnectTransportException;
 import org.elasticsearch.transport.ConnectionProfile;
-import org.elasticsearch.transport.Transport.Connection;
 import org.elasticsearch.transport.TransportRequestOptions.Type;
 import org.elasticsearch.transport.TransportService;
 
@@ -69,7 +69,7 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon
 
     @Override
     public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener) {
-        transportService.getThreadPool().generic().execute(new AbstractRunnable() {
+        transportService.getThreadPool().generic().execute(new ActionRunnable<>(listener) {
             private final AbstractRunnable thisConnectionAttempt = this;
 
             @Override
@@ -84,71 +84,52 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon
                 logger.trace("[{}] opening probe connection", thisConnectionAttempt);
                 transportService.openConnection(targetNode,
                     ConnectionProfile.buildSingleChannelProfile(Type.REG, probeConnectTimeout, probeHandshakeTimeout,
-                        TimeValue.MINUS_ONE, null), new ActionListener<>() {
-                        @Override
-                        public void onResponse(Connection connection) {
-                            logger.trace("[{}] opened probe connection", thisConnectionAttempt);
-
-                            // use NotifyOnceListener to make sure the following line does not result in onFailure being called when
-                            // the connection is closed in the onResponse handler
-                            transportService.handshake(connection, probeHandshakeTimeout.millis(), new NotifyOnceListener<DiscoveryNode>() {
-
-                                @Override
-                                protected void innerOnResponse(DiscoveryNode remoteNode) {
-                                    try {
-                                        // success means (amongst other things) that the cluster names match
-                                        logger.trace("[{}] handshake successful: {}", thisConnectionAttempt, remoteNode);
-                                        IOUtils.closeWhileHandlingException(connection);
-
-                                        if (remoteNode.equals(transportService.getLocalNode())) {
-                                            // TODO cache this result for some time? forever?
-                                            listener.onFailure(new ConnectTransportException(remoteNode, "local node found"));
-                                        } else if (remoteNode.isMasterNode() == false) {
-                                            // TODO cache this result for some time?
-                                            listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found"));
-                                        } else {
-                                            transportService.connectToNode(remoteNode, new ActionListener<Void>() {
-                                                @Override
-                                                public void onResponse(Void ignored) {
-                                                    logger.trace("[{}] full connection successful: {}", thisConnectionAttempt, remoteNode);
-                                                    listener.onResponse(remoteNode);
-                                                }
-
-                                                @Override
-                                                public void onFailure(Exception e) {
-                                                    listener.onFailure(e);
-                                                }
-                                            });
-                                        }
-                                    } catch (Exception e) {
-                                        listener.onFailure(e);
-                                    }
-                                }
-
-                                @Override
-                                protected void innerOnFailure(Exception e) {
-                                    // we opened a connection and successfully performed a low-level handshake, so we were definitely
-                                    // talking to an Elasticsearch node, but the high-level handshake failed indicating some kind of
-                                    // mismatched configurations (e.g. cluster name) that the user should address
-                                    logger.warn(new ParameterizedMessage("handshake failed for [{}]", thisConnectionAttempt), e);
+                        TimeValue.MINUS_ONE, null), ActionListener.delegateFailure(listener, (l, connection) -> {
+                        logger.trace("[{}] opened probe connection", thisConnectionAttempt);
+
+                        // use NotifyOnceListener to make sure the following line does not result in onFailure being called when
+                        // the connection is closed in the onResponse handler
+                        transportService.handshake(connection, probeHandshakeTimeout.millis(), new NotifyOnceListener<>() {
+
+                            @Override
+                            protected void innerOnResponse(DiscoveryNode remoteNode) {
+                                try {
+                                    // success means (amongst other things) that the cluster names match
+                                    logger.trace("[{}] handshake successful: {}", thisConnectionAttempt, remoteNode);
                                     IOUtils.closeWhileHandlingException(connection);
+
+                                    if (remoteNode.equals(transportService.getLocalNode())) {
+                                        // TODO cache this result for some time? forever?
+                                        listener.onFailure(new ConnectTransportException(remoteNode, "local node found"));
+                                    } else if (remoteNode.isMasterNode() == false) {
+                                        // TODO cache this result for some time?
+                                        listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found"));
+                                    } else {
+                                        transportService.connectToNode(remoteNode, ActionListener.delegateFailure(listener,
+                                            (l, ignored) -> {
+                                                logger.trace("[{}] full connection successful: {}", thisConnectionAttempt, remoteNode);
+                                                listener.onResponse(remoteNode);
+                                            }));
+                                    }
+                                } catch (Exception e) {
                                     listener.onFailure(e);
                                 }
+                            }
 
-                            });
+                            @Override
+                            protected void innerOnFailure(Exception e) {
+                                // we opened a connection and successfully performed a low-level handshake, so we were definitely
+                                // talking to an Elasticsearch node, but the high-level handshake failed indicating some kind of
+                                // mismatched configurations (e.g. cluster name) that the user should address
+                                logger.warn(new ParameterizedMessage("handshake failed for [{}]", thisConnectionAttempt), e);
+                                IOUtils.closeWhileHandlingException(connection);
+                                listener.onFailure(e);
+                            }
 
-                        }
+                        });
 
-                        @Override
-                        public void onFailure(Exception e) {
-                            listener.onFailure(e);
-                        }
-                    });
-            }
+                    }));
 
-            @Override
-            public void onFailure(Exception e) {
-                listener.onFailure(e);
             }
 
             @Override

+ 3 - 23
server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java

@@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -41,11 +42,9 @@ import org.elasticsearch.discovery.MasterNotDiscoveredException;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportChannel;
-import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequestHandler;
 import org.elasticsearch.transport.TransportResponse;
-import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
@@ -86,27 +85,8 @@ public class LocalAllocateDangledIndices {
         }
         AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(),
             indices.toArray(new IndexMetaData[indices.size()]));
-        transportService.sendRequest(masterNode, ACTION_NAME, request, new TransportResponseHandler<AllocateDangledResponse>() {
-            @Override
-            public AllocateDangledResponse read(StreamInput in) throws IOException {
-                return new AllocateDangledResponse(in);
-            }
-
-            @Override
-            public void handleResponse(AllocateDangledResponse response) {
-                listener.onResponse(response);
-            }
-
-            @Override
-            public void handleException(TransportException exp) {
-                listener.onFailure(exp);
-            }
-
-            @Override
-            public String executor() {
-                return ThreadPool.Names.SAME;
-            }
-        });
+        transportService.sendRequest(masterNode, ACTION_NAME, request,
+            new ActionListenerResponseHandler<>(listener, AllocateDangledResponse::new, ThreadPool.Names.SAME));
     }
 
     class AllocateDangledRequestHandler implements TransportRequestHandler<AllocateDangledRequest> {

+ 19 - 56
server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

@@ -22,6 +22,7 @@ package org.elasticsearch.index.shard;
 import org.elasticsearch.Assertions;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.common.CheckedRunnable;
 import org.elasticsearch.common.collect.Tuple;
@@ -249,9 +250,24 @@ final class IndexShardOperationPermits implements Closeable {
                     final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
                     final ActionListener<Releasable> wrappedListener;
                     if (executorOnDelay != null) {
-                        wrappedListener =
-                            new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
-                                        new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution);
+                        wrappedListener = ActionListener.delegateFailure(new ContextPreservingActionListener<>(contextSupplier, onAcquired),
+                            (l, r) -> threadPool.executor(executorOnDelay).execute(new ActionRunnable<>(l) {
+                                @Override
+                                public boolean isForceExecution() {
+                                    return forceExecution;
+                                }
+
+                                @Override
+                                protected void doRun() {
+                                    listener.onResponse(r);
+                                }
+
+                                @Override
+                                public void onRejection(Exception e) {
+                                    IOUtils.closeWhileHandlingException(r);
+                                    super.onRejection(e);
+                                }
+                            }));
                     } else {
                         wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
                     }
@@ -337,57 +353,4 @@ final class IndexShardOperationPermits implements Closeable {
             }
         }
     }
-
-    /**
-     * A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool.
-     * Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the
-     * invoker's thread to communicate failures.
-     */
-    private static class PermitAwareThreadedActionListener implements ActionListener<Releasable> {
-
-        private final ThreadPool threadPool;
-        private final String executor;
-        private final ActionListener<Releasable> listener;
-        private final boolean forceExecution;
-
-        private PermitAwareThreadedActionListener(ThreadPool threadPool, String executor, ActionListener<Releasable> listener,
-                                                  boolean forceExecution) {
-            this.threadPool = threadPool;
-            this.executor = executor;
-            this.listener = listener;
-            this.forceExecution = forceExecution;
-        }
-
-        @Override
-        public void onResponse(final Releasable releasable) {
-            threadPool.executor(executor).execute(new AbstractRunnable() {
-                @Override
-                public boolean isForceExecution() {
-                    return forceExecution;
-                }
-
-                @Override
-                protected void doRun() throws Exception {
-                    listener.onResponse(releasable);
-                }
-
-                @Override
-                public void onRejection(Exception e) {
-                    IOUtils.closeWhileHandlingException(releasable);
-                    super.onRejection(e);
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e); // will possibly execute on the caller thread
-                }
-            });
-        }
-
-        @Override
-        public void onFailure(final Exception e) {
-            listener.onFailure(e);
-        }
-    }
-
 }

+ 30 - 34
server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

@@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@@ -233,46 +234,41 @@ public class RepositoriesService implements ClusterStateApplier {
 
     public void verifyRepository(final String repositoryName, final ActionListener<List<DiscoveryNode>> listener) {
         final Repository repository = repository(repositoryName);
-        try {
-            threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
-                try {
-                    final String verificationToken = repository.startVerification();
-                    if (verificationToken != null) {
-                        try {
-                            verifyAction.verify(repositoryName, verificationToken, ActionListener.delegateFailure(listener,
-                                (delegatedListener, verifyResponse) -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
-                                    try {
-                                        repository.endVerification(verificationToken);
-                                    } catch (Exception e) {
-                                        logger.warn(() -> new ParameterizedMessage(
-                                            "[{}] failed to finish repository verification", repositoryName), e);
-                                        delegatedListener.onFailure(e);
-                                        return;
-                                    }
-                                    delegatedListener.onResponse(verifyResponse);
-                                })));
-                        } catch (Exception e) {
-                            threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
+        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<>(listener) {
+            @Override
+            protected void doRun() {
+                final String verificationToken = repository.startVerification();
+                if (verificationToken != null) {
+                    try {
+                        verifyAction.verify(repositoryName, verificationToken, ActionListener.delegateFailure(listener,
+                            (delegatedListener, verifyResponse) -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
                                 try {
                                     repository.endVerification(verificationToken);
-                                } catch (Exception inner) {
-                                    inner.addSuppressed(e);
+                                } catch (Exception e) {
                                     logger.warn(() -> new ParameterizedMessage(
-                                        "[{}] failed to finish repository verification", repositoryName), inner);
+                                        "[{}] failed to finish repository verification", repositoryName), e);
+                                    delegatedListener.onFailure(e);
+                                    return;
                                 }
-                                listener.onFailure(e);
-                            });
-                        }
-                    } else {
-                        listener.onResponse(Collections.emptyList());
+                                delegatedListener.onResponse(verifyResponse);
+                            })));
+                    } catch (Exception e) {
+                        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
+                            try {
+                                repository.endVerification(verificationToken);
+                            } catch (Exception inner) {
+                                inner.addSuppressed(e);
+                                logger.warn(() -> new ParameterizedMessage(
+                                    "[{}] failed to finish repository verification", repositoryName), inner);
+                            }
+                            listener.onFailure(e);
+                        });
                     }
-                } catch (Exception e) {
-                    listener.onFailure(e);
+                } else {
+                    listener.onResponse(Collections.emptyList());
                 }
-            });
-        } catch (Exception e) {
-            listener.onFailure(e);
-        }
+            }
+        });
     }
 
 

+ 4 - 15
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -341,12 +341,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     }
 
     private <T> void runAsync(long id, Supplier<T> executable, ActionListener<T> listener) {
-        getExecutor(id).execute(new ActionRunnable<T>(listener) {
-            @Override
-            protected void doRun() {
-                listener.onResponse(executable.get());
-            }
-        });
+        getExecutor(id).execute(ActionRunnable.wrap(listener, l -> l.onResponse(executable.get())));
     }
 
     private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception {
@@ -1044,15 +1039,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
         Executor executor = getExecutor(shard);
         ActionListener<Rewriteable> actionListener = ActionListener.wrap(r ->
-            // now we need to check if there is a pending refresh and register
-            shard.awaitShardSearchActive(b ->
-                executor.execute(new ActionRunnable<ShardSearchRequest>(listener) {
-                    @Override
-                    protected void doRun() {
-                        listener.onResponse(request);
-                    }
-                })
-            ), listener::onFailure);
+                // now we need to check if there is a pending refresh and register
+                shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.wrap(listener, l -> l.onResponse(request)))),
+            listener::onFailure);
         // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
         // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
         // adding a lot of overhead

+ 8 - 11
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -1298,17 +1298,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began
      */
     private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener<Void> listener, long repositoryStateId) {
-        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<>(listener) {
-            @Override
-            protected void doRun() {
-                Repository repository = repositoriesService.repository(snapshot.getRepository());
-                repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, ActionListener.wrap(v -> {
-                        logger.info("snapshot [{}] deleted", snapshot);
-                        removeSnapshotDeletionFromClusterState(snapshot, null, listener);
-                    }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, listener)
-                ));
-            }
-        });
+        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
+            Repository repository = repositoriesService.repository(snapshot.getRepository());
+            repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, ActionListener.wrap(v -> {
+                    logger.info("snapshot [{}] deleted", snapshot);
+                    removeSnapshotDeletionFromClusterState(snapshot, null, l);
+                }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)
+            ));
+        }));
     }
 
     /**

+ 1 - 13
server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java

@@ -30,7 +30,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.client.Requests;
@@ -134,18 +133,7 @@ public class TaskResultsService {
                 // The index already exists but doesn't have our mapping
                 client.admin().indices().preparePutMapping(TASK_INDEX).setType(TASK_TYPE)
                     .setSource(taskResultIndexMapping(), XContentType.JSON)
-                    .execute(new ActionListener<AcknowledgedResponse>() {
-                                 @Override
-                                 public void onResponse(AcknowledgedResponse putMappingResponse) {
-                                     doStoreResult(taskResult, listener);
-                                 }
-
-                                 @Override
-                                 public void onFailure(Exception e) {
-                                     listener.onFailure(e);
-                                 }
-                             }
-                    );
+                    .execute(ActionListener.delegateFailure(listener, (l, r) -> doStoreResult(taskResult, listener)));
             } else {
                 doStoreResult(taskResult, listener);
             }