Browse Source

Use ActionListenerResponseHandler in more spots to improve logging (#90884)

I saw some slow inbound handler logging form transport single shard
action but wasn't able to figure out what listener was slow here because
of the anonymous response handler not having a `toString`.
-> Using the functionally equivalent ActionListenerResponseHandler
saves some code here and gives a proper `toString` so I fixed the single
shard action usage and two other spots I could quickly find.
Armin Braun 3 years ago
parent
commit
449b3416d7

+ 2 - 18
server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java

@@ -12,6 +12,7 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionResponse;
@@ -45,8 +46,6 @@ import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.SendRequestTransportException;
 import org.elasticsearch.transport.Transport;
-import org.elasticsearch.transport.TransportException;
-import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 import org.junit.After;
 
@@ -547,22 +546,7 @@ public class CancellableTasksIT extends ESIntegTestCase {
                             subRequest.node,
                             ACTION.name(),
                             subRequest,
-                            new TransportResponseHandler<TestResponse>() {
-                                @Override
-                                public void handleResponse(TestResponse response) {
-                                    latchedListener.onResponse(response);
-                                }
-
-                                @Override
-                                public void handleException(TransportException exp) {
-                                    latchedListener.onFailure(exp);
-                                }
-
-                                @Override
-                                public TestResponse read(StreamInput in) throws IOException {
-                                    return new TestResponse(in);
-                                }
-                            }
+                            new ActionListenerResponseHandler<TestResponse>(latchedListener, TestResponse::new)
                         );
                     }
                 }

+ 7 - 18
server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.action.support.broadcast;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.NoShardAvailableActionException;
 import org.elasticsearch.action.support.ActionFilters;
@@ -29,9 +30,7 @@ import org.elasticsearch.core.Nullable;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportChannel;
-import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequestHandler;
-import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
@@ -185,22 +184,12 @@ public abstract class TransportBroadcastAction<
         }
 
         protected void sendShardRequest(DiscoveryNode node, ShardRequest shardRequest, ActionListener<ShardResponse> listener) {
-            transportService.sendRequest(node, transportShardAction, shardRequest, new TransportResponseHandler<ShardResponse>() {
-                @Override
-                public ShardResponse read(StreamInput in) throws IOException {
-                    return readShardResponse(in);
-                }
-
-                @Override
-                public void handleResponse(ShardResponse response) {
-                    listener.onResponse(response);
-                }
-
-                @Override
-                public void handleException(TransportException e) {
-                    listener.onFailure(e);
-                }
-            });
+            transportService.sendRequest(
+                node,
+                transportShardAction,
+                shardRequest,
+                new ActionListenerResponseHandler<>(listener, TransportBroadcastAction.this::readShardResponse)
+            );
         }
 
         protected void onOperation(ShardRouting shard, int shardIndex, ShardResponse response) {

+ 3 - 30
server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.action.support.single.shard;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.NoShardAvailableActionException;
@@ -25,7 +26,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardsIterator;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.core.Nullable;
@@ -35,7 +35,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequestHandler;
-import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
@@ -173,22 +172,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
                     clusterService.localNode(),
                     transportShardAction,
                     internalRequest.request(),
-                    new TransportResponseHandler<Response>() {
-                        @Override
-                        public Response read(StreamInput in) throws IOException {
-                            return reader.read(in);
-                        }
-
-                        @Override
-                        public void handleResponse(final Response response) {
-                            listener.onResponse(response);
-                        }
-
-                        @Override
-                        public void handleException(TransportException exp) {
-                            listener.onFailure(exp);
-                        }
-                    }
+                    new ActionListenerResponseHandler<>(listener, reader)
                 );
             } else {
                 perform(null);
@@ -241,18 +225,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
                     node,
                     transportShardAction,
                     internalRequest.request(),
-                    new TransportResponseHandler<Response>() {
-
-                        @Override
-                        public Response read(StreamInput in) throws IOException {
-                            return reader.read(in);
-                        }
-
-                        @Override
-                        public void handleResponse(final Response response) {
-                            listener.onResponse(response);
-                        }
-
+                    new ActionListenerResponseHandler<>(listener, reader) {
                         @Override
                         public void handleException(TransportException exp) {
                             onFailure(shardRouting, exp);