Browse Source

Remove Redundant TransportFuture Class (#65370)

Follow up to #64098, this class was only used in the
single test modified in this PR.
=> Removed it and made the test use other primitives as a replacement.
Armin Braun 4 năm trước cách đây
mục cha
commit
d3ddaaf358

+ 0 - 86
server/src/main/java/org/elasticsearch/transport/TransportFuture.java

@@ -1,86 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.transport;
-
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.util.concurrent.BaseFuture;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-public class TransportFuture<V extends TransportResponse> extends BaseFuture<V> implements Future<V>, TransportResponseHandler<V> {
-
-    private final TransportResponseHandler<V> handler;
-
-    public TransportFuture(TransportResponseHandler<V> handler) {
-        this.handler = handler;
-    }
-
-    public V txGet() {
-        try {
-            return get();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IllegalStateException("Future got interrupted", e);
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof ElasticsearchException) {
-                throw (ElasticsearchException) e.getCause();
-            } else {
-                throw new TransportException("Failed execution", e);
-            }
-        }
-    }
-
-    @Override
-    public V read(StreamInput in) throws IOException {
-        return handler.read(in);
-    }
-
-    @Override
-    public String executor() {
-        return handler.executor();
-    }
-
-    @Override
-    public void handleResponse(V response) {
-        try {
-            handler.handleResponse(response);
-            set(response);
-        } catch (Exception e) {
-            handleException(new ResponseHandlerFailureTransportException(e));
-        }
-    }
-
-    @Override
-    public void handleException(TransportException exp) {
-        try {
-            handler.handleException(exp);
-        } finally {
-            setException(exp);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "future(" + handler.toString() + ")";
-    }
-}

+ 69 - 128
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -30,6 +30,7 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
+import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Nullable;
@@ -85,6 +86,7 @@ import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -263,7 +265,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 }
             });
 
-        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
+        Future<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
             new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
                 @Override
                 public StringMessageResponse read(StreamInput in) throws IOException {
@@ -372,7 +374,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         threadPool.getThreadContext().putHeader("test.ping.user", "ping_user");
         threadPool.getThreadContext().putTransient("my_private_context", context);
 
-        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:ping_pong", ping, responseHandler);
+        Future<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:ping_pong", ping, responseHandler);
 
         StringMessageResponse message = res.get();
         assertThat("pong", equalTo(message.message));
@@ -551,7 +553,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         });
     }
 
-    public void testVoidMessageCompressed() {
+    public void testVoidMessageCompressed() throws Exception {
         try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY)) {
             serviceC.start();
             serviceC.acceptIncomingRequests();
@@ -570,7 +572,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
             connectToNode(serviceC, serviceA.getLocalDiscoNode(), connectionProfile);
 
-            TransportFuture<TransportResponse.Empty> res = submitRequest(serviceC, nodeA, "internal:sayHello",
+            Future<TransportResponse.Empty> res = submitRequest(serviceC, nodeA, "internal:sayHello",
                 TransportRequest.Empty.INSTANCE, new TransportResponseHandler<>() {
                     @Override
                     public TransportResponse.Empty read(StreamInput in) {
@@ -592,17 +594,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                         fail("got exception instead of a response: " + exp.getMessage());
                     }
                 });
-
-            try {
-                TransportResponse.Empty message = res.get();
-                assertThat(message, notNullValue());
-            } catch (Exception e) {
-                assertThat(e.getMessage(), false, equalTo(true));
-            }
+            assertThat(res.get(), notNullValue());
         }
     }
 
-    public void testHelloWorldCompressed() throws IOException {
+    public void testHelloWorldCompressed() throws Exception {
         try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION,  Settings.EMPTY)) {
             serviceC.start();
             serviceC.acceptIncomingRequests();
@@ -622,7 +618,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
             connectToNode(serviceC, serviceA.getLocalDiscoNode(), connectionProfile);
 
-            TransportFuture<StringMessageResponse> res = submitRequest(serviceC, nodeA, "internal:sayHello",
+            Future<StringMessageResponse> res = submitRequest(serviceC, nodeA, "internal:sayHello",
                 new StringMessageRequest("moshe"), new TransportResponseHandler<>() {
                     @Override
                     public StringMessageResponse read(StreamInput in) throws IOException {
@@ -646,23 +642,19 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                     }
                 });
 
-            try {
-                StringMessageResponse message = res.get();
-                assertThat("hello moshe", equalTo(message.message));
-            } catch (Exception e) {
-                assertThat(e.getMessage(), false, equalTo(true));
-            }
+            StringMessageResponse message = res.get();
+            assertThat("hello moshe", equalTo(message.message));
         }
     }
 
-    public void testErrorMessage() {
+    public void testErrorMessage() throws InterruptedException {
         serviceA.registerRequestHandler("internal:sayHelloException", ThreadPool.Names.GENERIC, StringMessageRequest::new,
             (request, channel, task) -> {
                 assertThat("moshe", equalTo(request.message));
                 throw new RuntimeException("bad message !!!");
             });
 
-        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloException",
+        Future<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloException",
             new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
                 @Override
                 public StringMessageResponse read(StreamInput in) throws IOException {
@@ -685,12 +677,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 }
             });
 
-        try {
-            res.txGet();
-            fail("exception should be thrown");
-        } catch (Exception e) {
-            assertThat(e.getCause().getMessage(), equalTo("runtime_exception: bad message !!!"));
-        }
+        final ExecutionException e = expectThrows(ExecutionException.class, res::get);
+        assertThat(e.getCause().getCause().getMessage(), equalTo("runtime_exception: bad message !!!"));
     }
 
     public void testDisconnectListener() throws Exception {
@@ -851,15 +839,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                         latch3.countDown();
                     }
                 });
-            TransportFuture<TransportResponse.Empty> foobar = submitRequest(serviceB, nodeA, "internal:foobar",
+            Future<TransportResponse.Empty> foobar = submitRequest(serviceB, nodeA, "internal:foobar",
                 new StringMessageRequest(""), EmptyTransportResponseHandler.INSTANCE_SAME);
             latch2.countDown();
-            try {
-                foobar.txGet();
-                fail("TransportException expected");
-            } catch (TransportException ex) {
-
-            }
+            assertThat(expectThrows(ExecutionException.class, foobar::get).getCause(), instanceOf(TransportException.class));
             latch3.await();
         } finally {
             serviceB.close(); // make sure we are fully closed here otherwise we might run into assertions down the road
@@ -869,15 +852,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
     public void testTimeoutSendExceptionWithNeverSendingBackResponse() throws Exception {
         serviceA.registerRequestHandler("internal:sayHelloTimeoutNoResponse", ThreadPool.Names.GENERIC, StringMessageRequest::new,
-            new TransportRequestHandler<StringMessageRequest>() {
-                @Override
-                public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) {
-                    assertThat("moshe", equalTo(request.message));
-                    // don't send back a response
-                }
-            });
+                (request, channel, task) -> assertThat("moshe", equalTo(request.message))); // don't send back a response
 
-        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutNoResponse",
+        Future<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutNoResponse",
             new StringMessageRequest("moshe"), TransportRequestOptions.timeout(HUNDRED_MS),
             new TransportResponseHandler<StringMessageResponse>() {
                 @Override
@@ -901,12 +878,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 }
             });
 
-        try {
-            res.txGet();
-            fail("exception should be thrown");
-        } catch (Exception e) {
-            assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
-        }
+        final ExecutionException e = expectThrows(ExecutionException.class, res::get);
+        assertThat(e.getCause(), instanceOf(ReceiveTimeoutTransportException.class));
     }
 
     public void testTimeoutSendExceptionWithDelayedResponse() throws Exception {
@@ -914,9 +887,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         CountDownLatch doneWaitingForever = new CountDownLatch(1);
         Semaphore inFlight = new Semaphore(Integer.MAX_VALUE);
         serviceA.registerRequestHandler("internal:sayHelloTimeoutDelayedResponse", ThreadPool.Names.GENERIC, StringMessageRequest::new,
-            new TransportRequestHandler<StringMessageRequest>() {
-                @Override
-                public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) throws InterruptedException {
+                (request, channel, task) -> {
                     String message = request.message;
                     inFlight.acquireUninterruptibly();
                     try {
@@ -938,10 +909,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                             doneWaitingForever.countDown();
                         }
                     }
-                }
-            });
+                });
         final CountDownLatch latch = new CountDownLatch(1);
-        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutDelayedResponse",
+        Future<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutDelayedResponse",
             new StringMessageRequest("forever"), TransportRequestOptions.timeout(HUNDRED_MS),
             new TransportResponseHandler<StringMessageResponse>() {
                 @Override
@@ -967,19 +937,14 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 }
             });
 
-        try {
-            res.txGet();
-            fail("exception should be thrown");
-        } catch (Exception e) {
-            assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
-        }
+        assertThat(expectThrows(ExecutionException.class, res::get).getCause(), instanceOf(ReceiveTimeoutTransportException.class));
         latch.await();
 
         List<Runnable> assertions = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
             final int counter = i;
             // now, try and send another request, this times, with a short timeout
-            TransportFuture<StringMessageResponse> result = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutDelayedResponse",
+            Future<StringMessageResponse> result = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutDelayedResponse",
                 new StringMessageRequest(counter + "ms"), TransportRequestOptions.timeout(TimeValue.timeValueSeconds(3)),
                 new TransportResponseHandler<StringMessageResponse>() {
                     @Override
@@ -1005,8 +970,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 });
 
             assertions.add(() -> {
-                StringMessageResponse message = result.txGet();
-                assertThat(message.message, equalTo("hello " + counter + "ms"));
+                try {
+                    assertThat(result.get().message, equalTo("hello " + counter + "ms"));
+                } catch (Exception e) {
+                    throw new AssertionError(e);
+                }
             });
         }
         for (Runnable runnable : assertions) {
@@ -1128,9 +1096,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             appender.addExpectation(notSeenSentExpectation);
             appender.addExpectation(notSeenReceivedExpectation);
 
-            TransportFuture<StringMessageResponse> future = new TransportFuture<>(noopResponseHandler);
-            serviceA.sendRequest(nodeB, "internal:testNotSeen", new StringMessageRequest(""), future);
-            future.txGet();
+            submitRequest(serviceA, nodeB, "internal:testNotSeen", new StringMessageRequest(""), noopResponseHandler).get();
 
             assertBusy(appender::assertAllExpectationsMatched);
         } finally {
@@ -1268,8 +1234,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             }
         }
 
-
-
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
@@ -1280,17 +1244,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
     }
 
     public void testVersionFrom0to1() throws Exception {
-        serviceB.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version1Request::new,
-            new TransportRequestHandler<Version1Request>() {
-                @Override
-                public void messageReceived(Version1Request request, TransportChannel channel, Task task) throws Exception {
-                    assertThat(request.value1, equalTo(1));
-                    assertThat(request.value2, equalTo(0)); // not set, coming from service A
-                    Version1Response response = new Version1Response(1, 2);
-                    channel.sendResponse(response);
-                    assertEquals(version0, channel.getVersion());
-                }
-            });
+        serviceB.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version1Request::new, (request, channel, task) -> {
+            assertThat(request.value1, equalTo(1));
+            assertThat(request.value2, equalTo(0)); // not set, coming from service A
+            Version1Response response = new Version1Response(1, 2);
+            channel.sendResponse(response);
+            assertEquals(version0, channel.getVersion());
+        });
 
         Version0Request version0Request = new Version0Request();
         version0Request.value1 = 1;
@@ -1311,22 +1271,18 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                     logger.error("Unexpected failure", exp);
                     fail("got exception instead of a response: " + exp.getMessage());
                 }
-            }).txGet();
+            }).get();
 
         assertThat(version0Response.value1, equalTo(1));
     }
 
     public void testVersionFrom1to0() throws Exception {
-        serviceA.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version0Request::new,
-            new TransportRequestHandler<Version0Request>() {
-                @Override
-                public void messageReceived(Version0Request request, TransportChannel channel, Task task) throws Exception {
-                    assertThat(request.value1, equalTo(1));
-                    Version0Response response = new Version0Response(1);
-                    channel.sendResponse(response);
-                    assertEquals(version0, channel.getVersion());
-                }
-            });
+        serviceA.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version0Request::new, (request, channel, task) -> {
+            assertThat(request.value1, equalTo(1));
+            Version0Response response = new Version0Response(1);
+            channel.sendResponse(response);
+            assertEquals(version0, channel.getVersion());
+        });
 
         Version1Request version1Request = new Version1Request();
         version1Request.value1 = 1;
@@ -1349,7 +1305,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                     logger.error("Unexpected failure", exp);
                     fail("got exception instead of a response: " + exp.getMessage());
                 }
-            }).txGet();
+            }).get();
 
         assertThat(version1Response.value1, equalTo(1));
         assertThat(version1Response.value2, equalTo(0));
@@ -1386,7 +1342,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                     logger.error("Unexpected failure", exp);
                     fail("got exception instead of a response: " + exp.getMessage());
                 }
-            }).txGet();
+            }).get();
 
         assertThat(version1Response.value1, equalTo(1));
         assertThat(version1Response.value2, equalTo(2));
@@ -1420,7 +1376,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                     logger.error("Unexpected failure", exp);
                     fail("got exception instead of a response: " + exp.getMessage());
                 }
-            }).txGet();
+            }).get();
 
         assertThat(version0Response.value1, equalTo(1));
     }
@@ -1434,7 +1390,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
         serviceB.addFailToSendNoConnectRule(serviceA);
 
-        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
+        Future<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
             new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
                 @Override
                 public StringMessageResponse read(StreamInput in) throws IOException {
@@ -1459,30 +1415,20 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 }
             });
 
-        try {
-            res.txGet();
-            fail("exception should be thrown");
-        } catch (Exception e) {
-            Throwable cause = ExceptionsHelper.unwrapCause(e);
-            assertThat(cause, instanceOf(ConnectTransportException.class));
-            assertThat(((ConnectTransportException) cause).node(), equalTo(nodeA));
-        }
+        final ExecutionException e = expectThrows(ExecutionException.class, res::get);
+        Throwable cause = ExceptionsHelper.unwrapCause(e.getCause());
+        assertThat(cause, instanceOf(ConnectTransportException.class));
+        assertThat(((ConnectTransportException) cause).node(), equalTo(nodeA));
 
         // wait for the transport to process the sending failure and disconnect from node
         assertBusy(() -> assertFalse(serviceB.nodeConnected(nodeA)));
 
         // now try to connect again and see that it fails
-        try {
-            connectToNode(serviceB, nodeA);
-            fail("exception should be thrown");
-        } catch (ConnectTransportException e) {
-            // all is well
-        }
-
+        expectThrows(ConnectTransportException.class, () -> connectToNode(serviceB, nodeA));
         expectThrows(ConnectTransportException.class, () -> openConnection(serviceB, nodeA, TestProfiles.LIGHT_PROFILE));
     }
 
-    public void testMockUnresponsiveRule() throws IOException {
+    public void testMockUnresponsiveRule() throws InterruptedException {
         serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new,
             (request, channel, task) -> {
                 assertThat("moshe", equalTo(request.message));
@@ -1491,7 +1437,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
         serviceB.addUnresponsiveRule(serviceA);
 
-        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
+        Future<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
             new StringMessageRequest("moshe"), TransportRequestOptions.timeout(HUNDRED_MS),
             new TransportResponseHandler<StringMessageResponse>() {
                 @Override
@@ -1515,21 +1461,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 }
             });
 
-        try {
-            res.txGet();
-            fail("exception should be thrown");
-        } catch (Exception e) {
-            assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
-        }
-
-        try {
+        assertThat(expectThrows(ExecutionException.class, res::get).getCause(), instanceOf(ReceiveTimeoutTransportException.class));
+        expectThrows(ConnectTransportException.class, () -> {
             serviceB.disconnectFromNode(nodeA);
             connectToNode(serviceB, nodeA);
-            fail("exception should be thrown");
-        } catch (ConnectTransportException e) {
-            // all is well
-        }
-
+        });
         expectThrows(ConnectTransportException.class, () -> openConnection(serviceB, nodeA, TestProfiles.LIGHT_PROFILE));
     }
 
@@ -2795,7 +2731,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         return PlainActionFuture.get(fut -> service.openConnection(node, connectionProfile, fut));
     }
 
-    public static <T extends TransportResponse> TransportFuture<T> submitRequest(TransportService transportService,
+    public static <T extends TransportResponse> Future<T> submitRequest(TransportService transportService,
                                                                                  DiscoveryNode node, String action,
                                                                                  TransportRequest request,
                                                                                  TransportResponseHandler<T> handler)
@@ -2803,17 +2739,22 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         return submitRequest(transportService, node, action, request, TransportRequestOptions.EMPTY, handler);
     }
 
-    public static <T extends TransportResponse> TransportFuture<T> submitRequest(TransportService transportService, DiscoveryNode node,
+    public static <T extends TransportResponse> Future<T> submitRequest(TransportService transportService, DiscoveryNode node,
                                                                                  String action, TransportRequest request,
                                                                                  TransportRequestOptions options,
                                                                                  TransportResponseHandler<T> handler)
                                                                                  throws TransportException {
-        TransportFuture<T> futureHandler = new TransportFuture<>(handler);
+        final StepListener<T> responseListener = new StepListener<>();
+        final TransportResponseHandler<T> futureHandler =
+                new ActionListenerResponseHandler<>(responseListener, handler, handler.executor());
         try {
             transportService.sendRequest(node, action, request, options, futureHandler);
         } catch (NodeNotConnectedException ex) {
             futureHandler.handleException(ex);
         }
-        return futureHandler;
+        responseListener.whenComplete(handler::handleResponse, e -> handler.handleException((TransportException) e));
+        final PlainActionFuture<T> future = PlainActionFuture.newFuture();
+        responseListener.whenComplete(future::onResponse, future::onFailure);
+        return future;
     }
 }