Browse Source

Responses can use Writeable.Reader interface (#34655)

In order to remove Streamable from the codebase, Response objects need
to be read using the Writeable.Reader interface which this change
enables. This change enables the use of Writeable.Reader by adding the
`Action#getResponseReader` method. The default implementation simply
uses the existing `newResponse` method and the readFrom method. As
responses are migrated to the Writeable.Reader interface, Action
classes can be updated to throw an UnsupportedOperationException when
`newResponse` is called and override the `getResponseReader` method.

Relates #34389
Jay Modi 7 years ago
parent
commit
a0279bc069
55 changed files with 499 additions and 343 deletions
  1. 2 1
      modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java
  2. 15 0
      server/src/main/java/org/elasticsearch/action/Action.java
  3. 10 8
      server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java
  4. 7 0
      server/src/main/java/org/elasticsearch/action/ActionResponse.java
  5. 1 1
      server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java
  6. 5 2
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java
  7. 7 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java
  8. 29 29
      server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java
  9. 8 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java
  10. 1 1
      server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java
  11. 6 2
      server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
  12. 4 0
      server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java
  13. 18 17
      server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
  14. 5 2
      server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java
  15. 4 2
      server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java
  16. 30 14
      server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java
  17. 6 2
      server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java
  18. 17 5
      server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
  19. 6 2
      server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java
  20. 9 4
      server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java
  21. 4 2
      server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java
  22. 4 2
      server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java
  23. 4 10
      server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java
  24. 4 10
      server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java
  25. 4 2
      server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java
  26. 12 6
      server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java
  27. 5 2
      server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
  28. 4 2
      server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java
  29. 4 0
      server/src/main/java/org/elasticsearch/search/SearchService.java
  30. 4 0
      server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java
  31. 4 0
      server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java
  32. 4 0
      server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java
  33. 4 0
      server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java
  34. 4 0
      server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
  35. 4 0
      server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java
  36. 2 1
      server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java
  37. 1 1
      server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java
  38. 6 4
      server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
  39. 11 13
      server/src/main/java/org/elasticsearch/transport/TcpTransport.java
  40. 10 9
      server/src/main/java/org/elasticsearch/transport/Transport.java
  41. 28 28
      server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java
  42. 7 6
      server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java
  43. 13 0
      server/src/main/java/org/elasticsearch/transport/TransportMessage.java
  44. 19 0
      server/src/main/java/org/elasticsearch/transport/TransportResponse.java
  45. 0 24
      server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java
  46. 10 14
      server/src/main/java/org/elasticsearch/transport/TransportService.java
  47. 1 2
      server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java
  48. 1 1
      server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java
  49. 4 2
      server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java
  50. 3 9
      server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java
  51. 23 19
      server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java
  52. 5 2
      test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java
  53. 86 74
      test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java
  54. 3 2
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java
  55. 7 2
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java

+ 2 - 1
modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java

@@ -20,6 +20,7 @@ package org.elasticsearch.transport.netty4;
 
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.Settings;
@@ -102,7 +103,7 @@ public class Netty4ScheduledPingTests extends ESTestCase {
                 TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(),
                 new TransportResponseHandler<TransportResponse.Empty>() {
                     @Override
-                    public TransportResponse.Empty newInstance() {
+                    public TransportResponse.Empty read(StreamInput in) {
                         return TransportResponse.Empty.INSTANCE;
                     }
 

+ 15 - 0
server/src/main/java/org/elasticsearch/action/Action.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action;
 
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.transport.TransportRequestOptions;
 
@@ -45,9 +46,23 @@ public abstract class Action<Response extends ActionResponse> {
 
     /**
      * Creates a new response instance.
+     * @deprecated Implement {@link #getResponseReader()} instead and make this method throw an
+     *             {@link UnsupportedOperationException}
      */
+    @Deprecated
     public abstract Response newResponse();
 
+    /**
+     * Get a reader that can create a new instance of the class from a {@link org.elasticsearch.common.io.stream.StreamInput}
+     */
+    public Writeable.Reader<Response> getResponseReader() {
+        return in -> {
+            Response response = newResponse();
+            response.readFrom(in);
+            return response;
+        };
+    }
+
     /**
      * Optional request options for the action.
      */

+ 10 - 8
server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java

@@ -19,13 +19,15 @@
 
 package org.elasticsearch.action;
 
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportResponse;
 
+import java.io.IOException;
 import java.util.Objects;
-import java.util.function.Supplier;
 
 /**
  * A simple base class for action response listeners, defaulting to using the SAME executor (as its
@@ -34,11 +36,11 @@ import java.util.function.Supplier;
 public class ActionListenerResponseHandler<Response extends TransportResponse> implements TransportResponseHandler<Response> {
 
     private final ActionListener<? super Response> listener;
-    private final Supplier<Response> responseSupplier;
+    private final Writeable.Reader<Response> reader;
 
-    public ActionListenerResponseHandler(ActionListener<? super Response> listener, Supplier<Response> responseSupplier) {
+    public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) {
         this.listener = Objects.requireNonNull(listener);
-        this.responseSupplier = Objects.requireNonNull(responseSupplier);
+        this.reader = Objects.requireNonNull(reader);
     }
 
     @Override
@@ -52,12 +54,12 @@ public class ActionListenerResponseHandler<Response extends TransportResponse> i
     }
 
     @Override
-    public Response newInstance() {
-        return responseSupplier.get();
+    public String executor() {
+        return ThreadPool.Names.SAME;
     }
 
     @Override
-    public String executor() {
-        return ThreadPool.Names.SAME;
+    public Response read(StreamInput in) throws IOException {
+        return reader.read(in);
     }
 }

+ 7 - 0
server/src/main/java/org/elasticsearch/action/ActionResponse.java

@@ -30,6 +30,13 @@ import java.io.IOException;
  */
 public abstract class ActionResponse extends TransportResponse {
 
+    public ActionResponse() {
+    }
+
+    public ActionResponse(StreamInput in) throws IOException {
+        super(in);
+    }
+
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);

+ 1 - 1
server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java

@@ -48,6 +48,6 @@ public class TransportActionNodeProxy<Request extends ActionRequest, Response ex
             return;
         }
         transportService.sendRequest(node, action.name(), request, transportOptions,
-            new ActionListenerResponseHandler<>(listener, action::newResponse));
+            new ActionListenerResponseHandler<>(listener, action.getResponseReader()));
     }
 }

+ 5 - 2
server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java

@@ -31,6 +31,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@@ -119,8 +120,10 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
         transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(),
                 new TransportResponseHandler<GetTaskResponse>() {
                     @Override
-                    public GetTaskResponse newInstance() {
-                        return new GetTaskResponse();
+                    public GetTaskResponse read(StreamInput in) throws IOException {
+                        GetTaskResponse response = new GetTaskResponse();
+                        response.readFrom(in);
+                        return response;
                     }
 
                     @Override

+ 7 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.action.admin.cluster.shards;
 
 import org.elasticsearch.action.Action;
+import org.elasticsearch.common.io.stream.Writeable;
 
 public class ClusterSearchShardsAction extends Action<ClusterSearchShardsResponse> {
 
@@ -32,6 +33,11 @@ public class ClusterSearchShardsAction extends Action<ClusterSearchShardsRespons
 
     @Override
     public ClusterSearchShardsResponse newResponse() {
-        return new ClusterSearchShardsResponse();
+        throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
+    }
+
+    @Override
+    public Writeable.Reader<ClusterSearchShardsResponse> getResponseReader() {
+        return ClusterSearchShardsResponse::new;
     }
 }

+ 29 - 29
server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java

@@ -38,36 +38,12 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo
     public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
             new DiscoveryNode[0], Collections.emptyMap());
 
-    private ClusterSearchShardsGroup[] groups;
-    private DiscoveryNode[] nodes;
-    private Map<String, AliasFilter> indicesAndFilters;
+    private final ClusterSearchShardsGroup[] groups;
+    private final DiscoveryNode[] nodes;
+    private final Map<String, AliasFilter> indicesAndFilters;
 
-    public ClusterSearchShardsResponse() {
-
-    }
-
-    public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes,
-                                       Map<String, AliasFilter> indicesAndFilters) {
-        this.groups = groups;
-        this.nodes = nodes;
-        this.indicesAndFilters = indicesAndFilters;
-    }
-
-    public ClusterSearchShardsGroup[] getGroups() {
-        return groups;
-    }
-
-    public DiscoveryNode[] getNodes() {
-        return nodes;
-    }
-
-    public Map<String, AliasFilter> getIndicesAndFilters() {
-        return indicesAndFilters;
-    }
-
-    @Override
-    public void readFrom(StreamInput in) throws IOException {
-        super.readFrom(in);
+    public ClusterSearchShardsResponse(StreamInput in) throws IOException {
+        super(in);
         groups = new ClusterSearchShardsGroup[in.readVInt()];
         for (int i = 0; i < groups.length; i++) {
             groups[i] = ClusterSearchShardsGroup.readSearchShardsGroupResponse(in);
@@ -85,6 +61,11 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo
         }
     }
 
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
@@ -103,6 +84,25 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo
         }
     }
 
+    public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes,
+                                       Map<String, AliasFilter> indicesAndFilters) {
+        this.groups = groups;
+        this.nodes = nodes;
+        this.indicesAndFilters = indicesAndFilters;
+    }
+
+    public ClusterSearchShardsGroup[] getGroups() {
+        return groups;
+    }
+
+    public DiscoveryNode[] getNodes() {
+        return nodes;
+    }
+
+    public Map<String, AliasFilter> getIndicesAndFilters() {
+        return indicesAndFilters;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();

+ 8 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java

@@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardIterator;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesService;
@@ -39,6 +40,7 @@ import org.elasticsearch.search.internal.AliasFilter;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -72,7 +74,12 @@ public class TransportClusterSearchShardsAction extends
 
     @Override
     protected ClusterSearchShardsResponse newResponse() {
-        return new ClusterSearchShardsResponse();
+        throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
+    }
+
+    @Override
+    protected ClusterSearchShardsResponse read(StreamInput in) throws IOException {
+        return new ClusterSearchShardsResponse(in);
     }
 
     @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java

@@ -49,7 +49,7 @@ public final class IngestActionForwarder implements ClusterStateApplier {
 
     public void forwardIngestRequest(Action<?> action, ActionRequest request, ActionListener<?> listener) {
         transportService.sendRequest(randomIngestNode(), action.name(), request,
-            new ActionListenerResponseHandler(listener, action::newResponse));
+            new ActionListenerResponseHandler(listener, action.getResponseReader()));
     }
 
     private DiscoveryNode randomIngestNode() {

+ 6 - 2
server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

@@ -32,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -45,6 +46,7 @@ import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
+import java.io.IOException;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -151,8 +153,10 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
             transportOptions,
             new TransportResponseHandler<ResyncReplicationResponse>() {
                 @Override
-                public ResyncReplicationResponse newInstance() {
-                    return newResponseInstance();
+                public ResyncReplicationResponse read(StreamInput in) throws IOException {
+                    ResyncReplicationResponse response = newResponseInstance();
+                    response.readFrom(in);
+                    return response;
                 }
 
                 @Override

+ 4 - 0
server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java

@@ -135,6 +135,10 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
     MultiSearchResponse() {
     }
 
+    MultiSearchResponse(StreamInput in) throws IOException {
+        readFrom(in);
+    }
+
     public MultiSearchResponse(Item[] items, long tookInMillis) {
         this.items = items;
         this.tookInMillis = tookInMillis;

+ 18 - 17
server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

@@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.search.SearchPhaseResult;
@@ -60,7 +61,6 @@ import java.io.UncheckedIOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.BiFunction;
-import java.util.function.Supplier;
 
 /**
  * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
@@ -119,7 +119,7 @@ public class SearchTransportService extends AbstractComponent {
 
     public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
         transportService.sendRequest(connection, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE,
-            TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
+            TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, (in) -> TransportResponse.Empty.INSTANCE));
     }
 
     public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
@@ -133,11 +133,11 @@ public class SearchTransportService extends AbstractComponent {
         // we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request
         // this used to be the QUERY_AND_FETCH which doesn't exist anymore.
         final boolean fetchDocuments = request.numberOfShards() == 1;
-        Supplier<SearchPhaseResult> supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
+        Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
 
         final ActionListener handler = responseWrapper.apply(connection, listener);
         transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
-                new ConnectionCountingHandler<>(handler, supplier, clientConnections, connection.getNode().getId()));
+                new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId()));
     }
 
     public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task,
@@ -155,8 +155,8 @@ public class SearchTransportService extends AbstractComponent {
     public void sendExecuteScrollFetch(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task,
                                        final SearchActionListener<ScrollQueryFetchSearchResult> listener) {
         transportService.sendChildRequest(connection, QUERY_FETCH_SCROLL_ACTION_NAME, request, task,
-                new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new,
-                        clientConnections, connection.getNode().getId()));
+                new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, clientConnections,
+                    connection.getNode().getId()));
     }
 
     public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task,
@@ -279,6 +279,10 @@ public class SearchTransportService extends AbstractComponent {
         SearchFreeContextResponse() {
         }
 
+        SearchFreeContextResponse(StreamInput in) throws IOException {
+            freed = in.readBoolean();
+        }
+
         SearchFreeContextResponse(boolean freed) {
             this.freed = freed;
         }
@@ -306,22 +310,20 @@ public class SearchTransportService extends AbstractComponent {
                 boolean freed = searchService.freeContext(request.id());
                 channel.sendResponse(new SearchFreeContextResponse(freed));
         });
-        TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME,
-                (Supplier<TransportResponse>) SearchFreeContextResponse::new);
+        TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new);
         transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
             (request, channel, task) -> {
                 boolean freed = searchService.freeContext(request.id());
                 channel.sendResponse(new SearchFreeContextResponse(freed));
         });
-        TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME,
-                (Supplier<TransportResponse>) SearchFreeContextResponse::new);
+        TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new);
         transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE,
             ThreadPool.Names.SAME, (request, channel, task) -> {
                 searchService.freeAllScrollContexts();
                 channel.sendResponse(TransportResponse.Empty.INSTANCE);
         });
         TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
-                () -> TransportResponse.Empty.INSTANCE);
+            (in) -> TransportResponse.Empty.INSTANCE);
 
         transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
             (request, channel, task) -> {
@@ -352,8 +354,8 @@ public class SearchTransportService extends AbstractComponent {
                 searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>(
                     channel, QUERY_ACTION_NAME, request));
             });
-        TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
-                (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);
+        TransportActionProxy.registerProxyActionWithDynamicResponseType(transportService, QUERY_ACTION_NAME,
+            (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);
 
         transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new,
             (request, channel, task) -> {
@@ -395,8 +397,7 @@ public class SearchTransportService extends AbstractComponent {
             (request, channel, task) -> {
                 searchService.canMatch(request, new ChannelActionListener<>(channel, QUERY_CAN_MATCH_NAME, request));
             });
-        TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME,
-                (Supplier<TransportResponse>) SearchService.CanMatchResponse::new);
+        TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, SearchService.CanMatchResponse::new);
     }
 
 
@@ -419,9 +420,9 @@ public class SearchTransportService extends AbstractComponent {
         private final Map<String, Long> clientConnections;
         private final String nodeId;
 
-        ConnectionCountingHandler(final ActionListener<? super Response> listener, final Supplier<Response> responseSupplier,
+        ConnectionCountingHandler(final ActionListener<? super Response> listener, final Writeable.Reader<Response> responseReader,
                                   final Map<String, Long> clientConnections, final String nodeId) {
-            super(listener, responseSupplier);
+            super(listener, responseReader);
             this.clientConnections = clientConnections;
             this.nodeId = nodeId;
             // Increment the number of connections for this node by one

+ 5 - 2
server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

@@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.ShardIterator;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.tasks.Task;
@@ -173,8 +174,10 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
                     } else {
                         transportService.sendRequest(node, transportShardAction, shardRequest, new TransportResponseHandler<ShardResponse>() {
                             @Override
-                            public ShardResponse newInstance() {
-                                return newShardResponse();
+                            public ShardResponse read(StreamInput in) throws IOException {
+                                ShardResponse response = newShardResponse();
+                                response.readFrom(in);
+                                return response;
                             }
 
                             @Override

+ 4 - 2
server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

@@ -313,8 +313,10 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
                 }
                 transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new TransportResponseHandler<NodeResponse>() {
                     @Override
-                    public NodeResponse newInstance() {
-                        return new NodeResponse();
+                    public NodeResponse read(StreamInput in) throws IOException {
+                        NodeResponse nodeResponse = new NodeResponse();
+                        nodeResponse.readFrom(in);
+                        return nodeResponse;
                     }
 
                     @Override

+ 30 - 14
server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

@@ -35,6 +35,8 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.Streamable;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
@@ -47,6 +49,7 @@ import org.elasticsearch.transport.ConnectTransportException;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportService;
 
+import java.io.IOException;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
@@ -101,8 +104,21 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
 
     protected abstract String executor();
 
+    /**
+     * @deprecated new implementors should override {@link #read(StreamInput)} and use the
+     *             {@link Writeable.Reader} interface.
+     * @return a new response instance. Typically this is used for serialization using the
+     *         {@link Streamable#readFrom(StreamInput)} method.
+     */
+    @Deprecated
     protected abstract Response newResponse();
 
+    protected Response read(StreamInput in) throws IOException {
+        Response response = newResponse();
+        response.readFrom(in);
+        return response;
+    }
+
     protected abstract void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception;
 
     /**
@@ -201,21 +217,21 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
                     } else {
                         DiscoveryNode masterNode = nodes.getMasterNode();
                         final String actionName = getMasterActionName(masterNode);
-                        transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler<Response>(listener,
-                            TransportMasterNodeAction.this::newResponse) {
-                            @Override
-                            public void handleException(final TransportException exp) {
-                                Throwable cause = exp.unwrapCause();
-                                if (cause instanceof ConnectTransportException) {
-                                    // we want to retry here a bit to see if a new master is elected
-                                    logger.debug("connection exception while trying to forward request with action name [{}] to " +
-                                            "master node [{}], scheduling a retry. Error: [{}]",
-                                        actionName, nodes.getMasterNode(), exp.getDetailedMessage());
-                                    retry(cause, masterChangePredicate);
-                                } else {
-                                    listener.onFailure(exp);
+                        transportService.sendRequest(masterNode, actionName, request,
+                            new ActionListenerResponseHandler<Response>(listener, TransportMasterNodeAction.this::read) {
+                                @Override
+                                public void handleException(final TransportException exp) {
+                                    Throwable cause = exp.unwrapCause();
+                                    if (cause instanceof ConnectTransportException) {
+                                        // we want to retry here a bit to see if a new master is elected
+                                        logger.debug("connection exception while trying to forward request with action name [{}] to " +
+                                                "master node [{}], scheduling a retry. Error: [{}]",
+                                            actionName, nodes.getMasterNode(), exp.getDetailedMessage());
+                                        retry(cause, masterChangePredicate);
+                                    } else {
+                                        listener.onFailure(exp);
+                                    }
                                 }
-                            }
                         });
                     }
                 }

+ 6 - 2
server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java

@@ -27,6 +27,7 @@ import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -39,6 +40,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -186,8 +188,10 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
                     transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),
                             new TransportResponseHandler<NodeResponse>() {
                                 @Override
-                                public NodeResponse newInstance() {
-                                    return newNodeResponse();
+                                public NodeResponse read(StreamInput in) throws IOException {
+                                    NodeResponse nodeResponse = newNodeResponse();
+                                    nodeResponse.readFrom(in);
+                                    return nodeResponse;
                                 }
 
                                 @Override

+ 17 - 5
server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

@@ -48,6 +48,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.settings.Settings;
@@ -317,12 +318,17 @@ public abstract class TransportReplicationAction<
                     // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
                     final ShardRouting primary = primaryShardReference.routingEntry();
                     assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
+                    final Writeable.Reader<Response> reader = in -> {
+                        Response response = TransportReplicationAction.this.newResponseInstance();
+                        response.readFrom(in);
+                        return response;
+                    };
                     DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
                     transportService.sendRequest(relocatingNode, transportPrimaryAction,
                         new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
                         transportOptions,
                         new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
-                            TransportReplicationAction.this::newResponseInstance) {
+                            reader) {
 
                             @Override
                             public void handleResponse(Response response) {
@@ -577,7 +583,7 @@ public abstract class TransportReplicationAction<
                         String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]";
                         TransportChannelResponseHandler<TransportResponse.Empty> handler =
                             new TransportChannelResponseHandler<>(logger, channel, extraMessage,
-                                () -> TransportResponse.Empty.INSTANCE);
+                                (in) -> TransportResponse.Empty.INSTANCE);
                         transportService.sendRequest(clusterService.localNode(), transportReplicaAction,
                             new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm,
                                 globalCheckpoint, maxSeqNoOfUpdatesOrDeletes),
@@ -813,8 +819,10 @@ public abstract class TransportReplicationAction<
             transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {
 
                 @Override
-                public Response newInstance() {
-                    return newResponseInstance();
+                public Response read(StreamInput in) throws IOException {
+                    Response response = newResponseInstance();
+                    response.readFrom(in);
+                    return response;
                 }
 
                 @Override
@@ -1186,7 +1194,11 @@ public abstract class TransportReplicationAction<
             final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
             final DiscoveryNode node,
             final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
-        final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener, ReplicaResponse::new);
+        final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener, in -> {
+            ReplicaResponse replicaResponse = new ReplicaResponse();
+            replicaResponse.readFrom(in);
+            return replicaResponse;
+        });
         transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler);
     }
 

+ 6 - 2
server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java

@@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.ShardIterator;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.node.NodeClosedException;
@@ -47,6 +48,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
+import java.io.IOException;
 import java.util.function.Supplier;
 
 public abstract class TransportInstanceSingleOperationAction<Request extends InstanceShardOperationRequest<Request>, Response extends ActionResponse>
@@ -178,8 +180,10 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
             transportService.sendRequest(node, shardActionName, request, transportOptions(), new TransportResponseHandler<Response>() {
 
                 @Override
-                public Response newInstance() {
-                    return newResponse();
+                public Response read(StreamInput in) throws IOException {
+                    Response response = newResponse();
+                    response.readFrom(in);
+                    return response;
                 }
 
                 @Override

+ 9 - 4
server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java

@@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardsIterator;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -182,8 +183,10 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
                 // just execute it on the local node
                 transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() {
                     @Override
-                    public Response newInstance() {
-                        return newResponse();
+                    public Response read(StreamInput in) throws IOException {
+                        Response response = newResponse();
+                        response.readFrom(in);
+                        return response;
                     }
 
                     @Override
@@ -246,8 +249,10 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
                 transportService.sendRequest(node, transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() {
 
                     @Override
-                    public Response newInstance() {
-                        return newResponse();
+                    public Response read(StreamInput in) throws IOException {
+                        Response response = newResponse();
+                        response.readFrom(in);
+                        return response;
                     }
 
                     @Override

+ 4 - 2
server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java

@@ -270,8 +270,10 @@ public abstract class TransportTasksAction<
                             transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),
                                 new TransportResponseHandler<NodeTasksResponse>() {
                                     @Override
-                                    public NodeTasksResponse newInstance() {
-                                        return new NodeTasksResponse();
+                                    public NodeTasksResponse read(StreamInput in) throws IOException {
+                                        NodeTasksResponse response = new NodeTasksResponse();
+                                        response.readFrom(in);
+                                        return response;
                                     }
 
                                     @Override

+ 4 - 2
server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java

@@ -511,8 +511,10 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
                                 new TransportResponseHandler<ClusterStateResponse>() {
 
                                     @Override
-                                    public ClusterStateResponse newInstance() {
-                                        return new ClusterStateResponse();
+                                    public ClusterStateResponse read(StreamInput in) throws IOException {
+                                        final ClusterStateResponse clusterStateResponse = new ClusterStateResponse();
+                                        clusterStateResponse.readFrom(in);
+                                        return clusterStateResponse;
                                     }
 
                                     @Override

+ 4 - 10
server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

@@ -225,8 +225,8 @@ public class MasterFaultDetection extends FaultDetection {
             transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options,
                 new TransportResponseHandler<MasterPingResponseResponse>() {
                         @Override
-                        public MasterPingResponseResponse newInstance() {
-                            return new MasterPingResponseResponse();
+                        public MasterPingResponseResponse read(StreamInput in) throws IOException {
+                            return new MasterPingResponseResponse(in);
                         }
 
                         @Override
@@ -433,14 +433,8 @@ public class MasterFaultDetection extends FaultDetection {
         private MasterPingResponseResponse() {
         }
 
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
+        private MasterPingResponseResponse(StreamInput in) throws IOException {
+            super(in);
         }
     }
 }

+ 4 - 10
server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java

@@ -226,8 +226,8 @@ public class NodesFaultDetection extends FaultDetection {
                 .withTimeout(pingRetryTimeout).build();
             transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler<PingResponse>() {
                         @Override
-                        public PingResponse newInstance() {
-                            return new PingResponse();
+                        public PingResponse read(StreamInput in) throws IOException {
+                            return new PingResponse(in);
                         }
 
                         @Override
@@ -359,14 +359,8 @@ public class NodesFaultDetection extends FaultDetection {
         private PingResponse() {
         }
 
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
+        private PingResponse(StreamInput in) throws IOException {
+            super(in);
         }
     }
 }

+ 4 - 2
server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java

@@ -84,8 +84,10 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
         AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(), indices.toArray(new IndexMetaData[indices.size()]));
         transportService.sendRequest(masterNode, ACTION_NAME, request, new TransportResponseHandler<AllocateDangledResponse>() {
             @Override
-            public AllocateDangledResponse newInstance() {
-                return new AllocateDangledResponse();
+            public AllocateDangledResponse read(StreamInput in) throws IOException {
+                final AllocateDangledResponse response = new AllocateDangledResponse();
+                response.readFrom(in);
+                return response;
             }
 
             @Override

+ 12 - 6
server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java

@@ -313,8 +313,10 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
             transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId),
                     new TransportResponseHandler<InFlightOpsResponse>() {
                         @Override
-                        public InFlightOpsResponse newInstance() {
-                            return new InFlightOpsResponse();
+                        public InFlightOpsResponse read(StreamInput in) throws IOException {
+                            InFlightOpsResponse response = new InFlightOpsResponse();
+                            response.readFrom(in);
+                            return response;
                         }
 
                         @Override
@@ -383,8 +385,10 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
             transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId),
                     new TransportResponseHandler<ShardSyncedFlushResponse>() {
                         @Override
-                        public ShardSyncedFlushResponse newInstance() {
-                            return new ShardSyncedFlushResponse();
+                        public ShardSyncedFlushResponse read(StreamInput in) throws IOException {
+                            ShardSyncedFlushResponse response = new ShardSyncedFlushResponse();
+                            response.readFrom(in);
+                            return response;
                         }
 
                         @Override
@@ -437,8 +441,10 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
             }
             transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreShardSyncedFlushRequest(shard.shardId()), new TransportResponseHandler<PreSyncedFlushResponse>() {
                 @Override
-                public PreSyncedFlushResponse newInstance() {
-                    return new PreSyncedFlushResponse();
+                public PreSyncedFlushResponse read(StreamInput in) throws IOException {
+                    PreSyncedFlushResponse response = new PreSyncedFlushResponse();
+                    response.readFrom(in);
+                    return response;
                 }
 
                 @Override

+ 5 - 2
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

@@ -35,6 +35,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
@@ -195,8 +196,10 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
                     transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
                             new FutureTransportResponseHandler<RecoveryResponse>() {
                                 @Override
-                                public RecoveryResponse newInstance() {
-                                    return new RecoveryResponse();
+                                public RecoveryResponse read(StreamInput in) throws IOException {
+                                    RecoveryResponse recoveryResponse = new RecoveryResponse();
+                                    recoveryResponse.readFrom(in);
+                                    return recoveryResponse;
                                 }
                             }).txGet()));
             final RecoveryResponse recoveryResponse = responseHolder.get();

+ 4 - 2
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java

@@ -63,8 +63,10 @@ public class RecoveryTranslogOperationsResponse extends TransportResponse {
     static TransportResponseHandler<RecoveryTranslogOperationsResponse> HANDLER =
             new FutureTransportResponseHandler<RecoveryTranslogOperationsResponse>() {
                 @Override
-                public RecoveryTranslogOperationsResponse newInstance() {
-                    return new RecoveryTranslogOperationsResponse();
+                public RecoveryTranslogOperationsResponse read(StreamInput in) throws IOException {
+                    RecoveryTranslogOperationsResponse response = new RecoveryTranslogOperationsResponse();
+                    response.readFrom(in);
+                    return response;
                 }
             };
 

+ 4 - 0
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -1101,6 +1101,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         public CanMatchResponse() {
         }
 
+        public CanMatchResponse(StreamInput in) throws IOException {
+            this.canMatch = in.readBoolean();
+        }
+
         public CanMatchResponse(boolean canMatch) {
             this.canMatch = canMatch;
         }

+ 4 - 0
server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java

@@ -46,6 +46,10 @@ public class DfsSearchResult extends SearchPhaseResult {
     public DfsSearchResult() {
     }
 
+    public DfsSearchResult(StreamInput in) throws IOException {
+        readFrom(in);
+    }
+
     public DfsSearchResult(long id, SearchShardTarget shardTarget) {
         this.setSearchShardTarget(shardTarget);
         this.requestId = id;

+ 4 - 0
server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java

@@ -38,6 +38,10 @@ public final class FetchSearchResult extends SearchPhaseResult {
     public FetchSearchResult() {
     }
 
+    public FetchSearchResult(StreamInput in) throws IOException {
+        readFrom(in);
+    }
+
     public FetchSearchResult(long id, SearchShardTarget shardTarget) {
         this.requestId = id;
         setSearchShardTarget(shardTarget);

+ 4 - 0
server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java

@@ -38,6 +38,10 @@ public final class QueryFetchSearchResult extends SearchPhaseResult {
     public QueryFetchSearchResult() {
     }
 
+    public QueryFetchSearchResult(StreamInput in) throws IOException {
+        readFrom(in);
+    }
+
     public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) {
         this.queryResult = queryResult;
         this.fetchResult = fetchResult;

+ 4 - 0
server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java

@@ -36,6 +36,10 @@ public final class ScrollQueryFetchSearchResult extends SearchPhaseResult {
     public ScrollQueryFetchSearchResult() {
     }
 
+    public ScrollQueryFetchSearchResult(StreamInput in) throws IOException {
+        readFrom(in);
+    }
+
     public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) {
         this.result = result;
         setSearchShardTarget(shardTarget);

+ 4 - 0
server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

@@ -66,6 +66,10 @@ public final class QuerySearchResult extends SearchPhaseResult {
     public QuerySearchResult() {
     }
 
+    public QuerySearchResult(StreamInput in) throws IOException {
+        readFrom(in);
+    }
+
     public QuerySearchResult(long id, SearchShardTarget shardTarget) {
         this.requestId = id;
         setSearchShardTarget(shardTarget);

+ 4 - 0
server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java

@@ -35,6 +35,10 @@ public final class ScrollQuerySearchResult extends SearchPhaseResult {
     public ScrollQuerySearchResult() {
     }
 
+    public ScrollQuerySearchResult(StreamInput in) throws IOException {
+        readFrom(in);
+    }
+
     public ScrollQuerySearchResult(QuerySearchResult result, SearchShardTarget shardTarget) {
         this.result = result;
         setSearchShardTarget(shardTarget);

+ 2 - 1
server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.transport;
 
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.threadpool.ThreadPool;
 
 public class EmptyTransportResponseHandler implements TransportResponseHandler<TransportResponse.Empty> {
@@ -32,7 +33,7 @@ public class EmptyTransportResponseHandler implements TransportResponseHandler<T
     }
 
     @Override
-    public TransportResponse.Empty newInstance() {
+    public TransportResponse.Empty read(StreamInput in) {
         return TransportResponse.Empty.INSTANCE;
     }
 

+ 1 - 1
server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java

@@ -47,7 +47,7 @@ final class RemoteClusterAwareClient extends AbstractClient {
         remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> {
             Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
             service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY,
-                new ActionListenerResponseHandler<>(listener, action::newResponse));
+                new ActionListenerResponseHandler<>(listener, action.getResponseReader()));
         },
         listener::onFailure));
     }

+ 6 - 4
server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

@@ -218,8 +218,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
             new TransportResponseHandler<ClusterSearchShardsResponse>() {
 
                 @Override
-                public ClusterSearchShardsResponse newInstance() {
-                    return new ClusterSearchShardsResponse();
+                public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
+                    return new ClusterSearchShardsResponse(in);
                 }
 
                 @Override
@@ -591,8 +591,10 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
             }
 
             @Override
-            public ClusterStateResponse newInstance() {
-                return new ClusterStateResponse();
+            public ClusterStateResponse read(StreamInput in) throws IOException {
+                ClusterStateResponse response = new ClusterStateResponse();
+                response.readFrom(in);
+                return response;
             }
 
             @Override

+ 11 - 13
server/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -205,7 +205,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
 
     private final MeanMetric readBytesMetric = new MeanMetric();
     private final MeanMetric transmittedBytesMetric = new MeanMetric();
-    private volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
+    private volatile Map<String, RequestHandlerRegistry<? extends TransportRequest>> requestHandlers = Collections.emptyMap();
     private final ResponseHandlers responseHandlers = new ResponseHandlers();
     private final TransportLogger transportLogger;
     private final BytesReference pingMessage;
@@ -284,8 +284,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         }
 
         @Override
-        public VersionHandshakeResponse newInstance() {
-            return new VersionHandshakeResponse();
+        public VersionHandshakeResponse read(StreamInput in) throws IOException {
+            return new VersionHandshakeResponse(in);
         }
 
         @Override
@@ -1273,7 +1273,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
                 if (isHandshake) {
                     handler = pendingHandshakes.remove(requestId);
                 } else {
-                    TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener);
+                    TransportResponseHandler<? extends TransportResponse> theHandler =
+                        responseHandlers.onResponseReceived(requestId, messageListener);
                     if (theHandler == null && TransportStatus.isError(status)) {
                         handler = pendingHandshakes.remove(requestId);
                     } else {
@@ -1319,8 +1320,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         }
     }
 
-    private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler) {
-        final TransportResponse response;
+    private <T extends TransportResponse> void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream,
+                                final TransportResponseHandler<T> handler) {
+        final T response;
         try {
             response = handler.read(stream);
             response.remoteAddress(new TransportAddress(remoteAddress));
@@ -1469,17 +1471,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
     }
 
     private static final class VersionHandshakeResponse extends TransportResponse {
-        private Version version;
+        private final Version version;
 
         private VersionHandshakeResponse(Version version) {
             this.version = version;
         }
 
-        private VersionHandshakeResponse() {
-        }
-
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
+        private VersionHandshakeResponse(StreamInput in) throws IOException {
             super.readFrom(in);
             version = Version.readVersion(in);
         }
@@ -1736,7 +1734,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
     }
 
     @Override
-    public final RequestHandlerRegistry getRequestHandler(String action) {
+    public final RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action) {
         return requestHandlers.get(action);
     }
 }

+ 10 - 9
server/src/main/java/org/elasticsearch/transport/Transport.java

@@ -54,7 +54,7 @@ public interface Transport extends LifecycleComponent {
      * Returns the registered request handler registry for the given action or <code>null</code> if it's not registered
      * @param action the action to look up
      */
-    RequestHandlerRegistry getRequestHandler(String action);
+    RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action);
 
     void addMessageListener(TransportMessageListener listener);
 
@@ -184,7 +184,7 @@ public interface Transport extends LifecycleComponent {
      * This class is a registry that allows
      */
     final class ResponseHandlers {
-        private final ConcurrentMapLong<ResponseContext> handlers = ConcurrentCollections
+        private final ConcurrentMapLong<ResponseContext<? extends TransportResponse>> handlers = ConcurrentCollections
             .newConcurrentMapLongWithAggressiveConcurrency();
         private final AtomicLong requestIdGenerator = new AtomicLong();
 
@@ -208,7 +208,7 @@ public interface Transport extends LifecycleComponent {
          * @return the new request ID
          * @see Connection#sendRequest(long, String, TransportRequest, TransportRequestOptions)
          */
-        public long add(ResponseContext holder) {
+        public long add(ResponseContext<? extends TransportResponse> holder) {
             long requestId = newRequestId();
             ResponseContext existing = handlers.put(requestId, holder);
             assert existing == null : "request ID already in use: " + requestId;
@@ -226,10 +226,10 @@ public interface Transport extends LifecycleComponent {
         /**
          * Removes and returns all {@link ResponseContext} instances that match the predicate
          */
-        public List<ResponseContext> prune(Predicate<ResponseContext> predicate) {
-            final List<ResponseContext> holders = new ArrayList<>();
-            for (Map.Entry<Long, ResponseContext> entry : handlers.entrySet()) {
-                ResponseContext holder = entry.getValue();
+        public List<ResponseContext<? extends TransportResponse>> prune(Predicate<ResponseContext> predicate) {
+            final List<ResponseContext<? extends TransportResponse>> holders = new ArrayList<>();
+            for (Map.Entry<Long, ResponseContext<? extends TransportResponse>> entry : handlers.entrySet()) {
+                ResponseContext<? extends TransportResponse> holder = entry.getValue();
                 if (predicate.test(holder)) {
                     ResponseContext remove = handlers.remove(entry.getKey());
                     if (remove != null) {
@@ -245,8 +245,9 @@ public interface Transport extends LifecycleComponent {
          * sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not
          * found.
          */
-        public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) {
-            ResponseContext context = handlers.remove(requestId);
+        public TransportResponseHandler<? extends TransportResponse> onResponseReceived(final long requestId,
+                                                                                        final TransportMessageListener listener) {
+            ResponseContext<? extends TransportResponse> context = handlers.remove(requestId);
             listener.onResponseReceived(requestId, context);
             if (context == null) {
                 return null;

+ 28 - 28
server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java

@@ -28,7 +28,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.function.Function;
-import java.util.function.Supplier;
 
 /**
  * TransportActionProxy allows an arbitrary action to be executed on a defined target node while the initial request is sent to a second
@@ -43,10 +42,10 @@ public final class TransportActionProxy {
 
         private final TransportService service;
         private final String action;
-        private final Function<TransportRequest, Supplier<TransportResponse>> responseFunction;
+        private final Function<TransportRequest, Writeable.Reader<? extends TransportResponse>> responseFunction;
 
         ProxyRequestHandler(TransportService service, String action, Function<TransportRequest,
-                Supplier<TransportResponse>> responseFunction) {
+                Writeable.Reader<? extends TransportResponse>> responseFunction) {
             this.service = service;
             this.action = action;
             this.responseFunction = responseFunction;
@@ -63,17 +62,17 @@ public final class TransportActionProxy {
 
     private static class ProxyResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
 
-        private final Supplier<T> responseFactory;
+        private final Writeable.Reader<T> reader;
         private final TransportChannel channel;
 
-        ProxyResponseHandler(TransportChannel channel, Supplier<T> responseFactory) {
-            this.responseFactory = responseFactory;
+        ProxyResponseHandler(TransportChannel channel, Writeable.Reader<T> reader) {
+            this.reader = reader;
             this.channel = channel;
-
         }
+
         @Override
-        public T newInstance() {
-            return responseFactory.get();
+        public T read(StreamInput in) throws IOException {
+            return reader.read(in);
         }
 
         @Override
@@ -101,26 +100,25 @@ public final class TransportActionProxy {
     }
 
     static class ProxyRequest<T extends TransportRequest> extends TransportRequest {
-        T wrapped;
-        Writeable.Reader<T> reader;
-        DiscoveryNode targetNode;
-
-        ProxyRequest(Writeable.Reader<T> reader) {
-            this.reader = reader;
-        }
+        final T wrapped;
+        final DiscoveryNode targetNode;
 
         ProxyRequest(T wrapped, DiscoveryNode targetNode) {
             this.wrapped = wrapped;
             this.targetNode = targetNode;
         }
 
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
+        ProxyRequest(StreamInput in, Writeable.Reader<T> reader) throws IOException {
+            super(in);
             targetNode = new DiscoveryNode(in);
             wrapped = reader.read(in);
         }
 
+        @Override
+        public void readFrom(StreamInput in) throws IOException {
+            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
@@ -133,21 +131,23 @@ public final class TransportActionProxy {
      * Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the
      * response type changes based on the upcoming request (quite rare)
      */
-    public static void registerProxyAction(TransportService service, String action,
-                                           Function<TransportRequest, Supplier<TransportResponse>> responseFunction) {
-        RequestHandlerRegistry requestHandler = service.getRequestHandler(action);
-        service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME,
-            true, false, new ProxyRequestHandler<>(service, action, responseFunction));
+    public static void registerProxyActionWithDynamicResponseType(TransportService service, String action,
+                                                                  Function<TransportRequest,
+                                                                      Writeable.Reader<? extends TransportResponse>> responseFunction) {
+        RequestHandlerRegistry<? extends TransportRequest> requestHandler = service.getRequestHandler(action);
+        service.registerRequestHandler(getProxyAction(action), ThreadPool.Names.SAME, true, false,
+            in -> new ProxyRequest<>(in, requestHandler::newRequest), new ProxyRequestHandler<>(service, action, responseFunction));
     }
 
     /**
      * Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the
      * response type is always the same (most of the cases).
      */
-    public static void registerProxyAction(TransportService service, String action, Supplier<TransportResponse> responseSupplier) {
-        RequestHandlerRegistry requestHandler = service.getRequestHandler(action);
-        service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME,
-                true, false, new ProxyRequestHandler<>(service, action, request -> responseSupplier));
+    public static void registerProxyAction(TransportService service, String action,
+                                           Writeable.Reader<? extends TransportResponse> reader) {
+        RequestHandlerRegistry<? extends TransportRequest> requestHandler = service.getRequestHandler(action);
+        service.registerRequestHandler(getProxyAction(action), ThreadPool.Names.SAME, true, false,
+            in -> new ProxyRequest<>(in, requestHandler::newRequest), new ProxyRequestHandler<>(service, action, request -> reader));
     }
 
     private static final String PROXY_ACTION_PREFIX = "internal:transport/proxy/";

+ 7 - 6
server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java

@@ -21,10 +21,11 @@ package org.elasticsearch.transport;
 
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
-import java.util.function.Supplier;
 
 /**
  * Base class for delegating transport response to a transport channel
@@ -34,19 +35,19 @@ public class TransportChannelResponseHandler<T extends TransportResponse> implem
     private final Logger logger;
     private final TransportChannel channel;
     private final String extraInfoOnError;
-    private final Supplier<T> responseSupplier;
+    private final Writeable.Reader<T> reader;
 
     public TransportChannelResponseHandler(Logger logger, TransportChannel channel, String extraInfoOnError,
-                                           Supplier<T> responseSupplier) {
+                                           Writeable.Reader<T> reader) {
         this.logger = logger;
         this.channel = channel;
         this.extraInfoOnError = extraInfoOnError;
-        this.responseSupplier = responseSupplier;
+        this.reader = reader;
     }
 
     @Override
-    public T newInstance() {
-        return responseSupplier.get();
+    public T read(StreamInput in) throws IOException {
+        return reader.read(in);
     }
 
     @Override

+ 13 - 0
server/src/main/java/org/elasticsearch/transport/TransportMessage.java

@@ -39,6 +39,19 @@ public abstract class TransportMessage implements Streamable, Writeable {
         return remoteAddress;
     }
 
+    /**
+     * Constructs a new empty transport message
+     */
+    public TransportMessage() {
+    }
+
+    /**
+     * Constructs a new transport message with the data from the {@link StreamInput}. This is
+     * currently a no-op
+     */
+    public TransportMessage(StreamInput in) throws IOException {
+    }
+
     @Override
     public void readFrom(StreamInput in) throws IOException {
 

+ 19 - 0
server/src/main/java/org/elasticsearch/transport/TransportResponse.java

@@ -19,8 +19,27 @@
 
 package org.elasticsearch.transport;
 
+import org.elasticsearch.common.io.stream.StreamInput;
+
+import java.io.IOException;
+
 public abstract class TransportResponse extends TransportMessage {
 
+    /**
+     * Constructs a new empty transport response
+     */
+    public TransportResponse() {
+    }
+
+    /**
+     * Constructs a new transport response with the data from the {@link StreamInput}. This is
+     * currently a no-op. However, this exists to allow extenders to call <code>super(in)</code>
+     * so that reading can mirror writing where we often call <code>super.writeTo(out)</code>.
+     */
+    public TransportResponse(StreamInput in) throws IOException {
+        super(in);
+    }
+
     public static class Empty extends TransportResponse {
         public static final Empty INSTANCE = new Empty();
     }

+ 0 - 24
server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java

@@ -19,34 +19,10 @@
 
 package org.elasticsearch.transport;
 
-import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.Writeable;
 
-import java.io.IOException;
-
 public interface TransportResponseHandler<T extends TransportResponse> extends Writeable.Reader<T> {
 
-    /**
-     * @deprecated Implement {@link #read(StreamInput)} instead.
-     */
-    @Deprecated
-    default T newInstance() {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * deserializes a new instance of the return type from the stream.
-     * called by the infra when de-serializing the response.
-     *
-     * @return the deserialized response.
-     */
-    @Override
-    default T read(StreamInput in) throws IOException {
-        T instance = newInstance();
-        instance.readFrom(in);
-        return instance;
-    }
-
     void handleResponse(T response);
 
     void handleException(TransportException exp);

+ 10 - 14
server/src/main/java/org/elasticsearch/transport/TransportService.java

@@ -434,8 +434,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
             PlainTransportFuture<HandshakeResponse> futureHandler = new PlainTransportFuture<>(
                 new FutureTransportResponseHandler<HandshakeResponse>() {
                 @Override
-                public HandshakeResponse newInstance() {
-                    return new HandshakeResponse();
+                public HandshakeResponse read(StreamInput in) throws IOException {
+                    return new HandshakeResponse(in);
                 }
             });
             sendRequest(connection, HANDSHAKE_ACTION_NAME, HandshakeRequest.INSTANCE,
@@ -468,12 +468,9 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
     }
 
     public static class HandshakeResponse extends TransportResponse {
-        private DiscoveryNode discoveryNode;
-        private ClusterName clusterName;
-        private Version version;
-
-        HandshakeResponse() {
-        }
+        private final DiscoveryNode discoveryNode;
+        private final ClusterName clusterName;
+        private final Version version;
 
         public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) {
             this.discoveryNode = discoveryNode;
@@ -481,9 +478,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
             this.clusterName = clusterName;
         }
 
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
+        public HandshakeResponse(StreamInput in) throws IOException {
+            super(in);
             discoveryNode = in.readOptionalWriteable(DiscoveryNode::new);
             clusterName = new ClusterName(in);
             version = Version.readVersion(in);
@@ -930,7 +926,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
         }
     }
 
-    public RequestHandlerRegistry getRequestHandler(String action) {
+    public RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action) {
         return transport.getRequestHandler(action);
     }
 
@@ -977,8 +973,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
     @Override
     public void onConnectionClosed(Transport.Connection connection) {
         try {
-            List<Transport.ResponseContext> pruned = responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection
-                .getCacheKey()));
+            List<Transport.ResponseContext<? extends TransportResponse>> pruned =
+                responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection.getCacheKey()));
             // callback that an exception happened, but on a different thread since we don't
             // want handlers to worry about stack overflows
             getExecutorService().execute(() -> {

+ 1 - 2
server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java

@@ -83,8 +83,7 @@ public class ClusterSearchShardsResponseTests extends ESTestCase {
             clusterSearchShardsResponse.writeTo(out);
             try(StreamInput in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry)) {
                 in.setVersion(version);
-                ClusterSearchShardsResponse deserialized = new ClusterSearchShardsResponse();
-                deserialized.readFrom(in);
+                ClusterSearchShardsResponse deserialized = new ClusterSearchShardsResponse(in);
                 assertArrayEquals(clusterSearchShardsResponse.getNodes(), deserialized.getNodes());
                 assertEquals(clusterSearchShardsResponse.getGroups().length, deserialized.getGroups().length);
                 for (int i = 0; i < clusterSearchShardsResponse.getGroups().length; i++) {

+ 1 - 1
server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

@@ -254,7 +254,7 @@ public class TransportSearchActionTests extends ESTestCase {
             remoteIndices.put(cluster, randomOriginalIndices());
             if (onlySuccessful || randomBoolean()) {
                 //whatever response counts as successful as long as it's not the empty placeholder
-                searchShardsResponses.put(cluster, new ClusterSearchShardsResponse());
+                searchShardsResponses.put(cluster, new ClusterSearchShardsResponse(null, null, null));
                 successful++;
             } else {
                 searchShardsResponses.put(cluster, ClusterSearchShardsResponse.EMPTY);

+ 4 - 2
server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java

@@ -253,8 +253,8 @@ public class TransportClientNodesServiceTests extends ESTestCase {
                     iteration.transportService.sendRequest(node, "action", new TestRequest(),
                             TransportRequestOptions.EMPTY, new TransportResponseHandler<TestResponse>() {
                         @Override
-                        public TestResponse newInstance() {
-                            return new TestResponse();
+                        public TestResponse read(StreamInput in) {
+                            return new TestResponse(in);
                         }
 
                         @Override
@@ -435,5 +435,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
 
     private static class TestResponse extends TransportResponse {
 
+        private TestResponse() {}
+        private TestResponse(StreamInput in) {}
     }
 }

+ 3 - 9
server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

@@ -172,9 +172,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                         new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
                             @Override
                             public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
-                                ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
-                                inst.readFrom(in);
-                                return inst;
+                                return new ClusterSearchShardsResponse(in);
                             }
                         });
                     TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
@@ -215,9 +213,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                         new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
                             @Override
                             public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
-                                ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
-                                inst.readFrom(in);
-                                return inst;
+                                return new ClusterSearchShardsResponse(in);
                             }
                         });
                     TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
@@ -233,9 +229,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                         new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
                             @Override
                             public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
-                                ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
-                                inst.readFrom(in);
-                                return inst;
+                                return new ClusterSearchShardsResponse(in);
                             }
                         });
                     TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG)

+ 23 - 19
server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java

@@ -86,8 +86,7 @@ public class TransportActionProxyTests extends ESTestCase {
         serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME,
             (request, channel, task) -> {
                 assertEquals(request.sourceNode, "TS_A");
-                SimpleTestResponse response = new SimpleTestResponse();
-                response.targetNode = "TS_A";
+                SimpleTestResponse response = new SimpleTestResponse("TS_A");
                 channel.sendResponse(response);
             });
         TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new);
@@ -96,8 +95,7 @@ public class TransportActionProxyTests extends ESTestCase {
         serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME,
             (request, channel, task) -> {
                 assertEquals(request.sourceNode, "TS_A");
-                SimpleTestResponse response = new SimpleTestResponse();
-                response.targetNode = "TS_B";
+                SimpleTestResponse response = new SimpleTestResponse("TS_B");
                 channel.sendResponse(response);
             });
         TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new);
@@ -105,8 +103,7 @@ public class TransportActionProxyTests extends ESTestCase {
         serviceC.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME,
             (request, channel, task) -> {
                 assertEquals(request.sourceNode, "TS_A");
-                SimpleTestResponse response = new SimpleTestResponse();
-                response.targetNode = "TS_C";
+                SimpleTestResponse response = new SimpleTestResponse("TS_C");
                 channel.sendResponse(response);
             });
         TransportActionProxy.registerProxyAction(serviceC, "internal:test", SimpleTestResponse::new);
@@ -115,8 +112,8 @@ public class TransportActionProxyTests extends ESTestCase {
         serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("internal:test"), TransportActionProxy.wrapRequest(nodeC,
             new SimpleTestRequest("TS_A")), new TransportResponseHandler<SimpleTestResponse>() {
                 @Override
-                public SimpleTestResponse newInstance() {
-                    return new SimpleTestResponse();
+                public SimpleTestResponse read(StreamInput in) throws IOException {
+                    return new SimpleTestResponse(in);
                 }
 
                 @Override
@@ -131,7 +128,7 @@ public class TransportActionProxyTests extends ESTestCase {
                 @Override
                 public void handleException(TransportException exp) {
                     try {
-                    throw new AssertionError(exp);
+                        throw new AssertionError(exp);
                     } finally {
                         latch.countDown();
                     }
@@ -149,8 +146,7 @@ public class TransportActionProxyTests extends ESTestCase {
         serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME,
             (request, channel, task) -> {
                 assertEquals(request.sourceNode, "TS_A");
-                SimpleTestResponse response = new SimpleTestResponse();
-                response.targetNode = "TS_A";
+                SimpleTestResponse response = new SimpleTestResponse("TS_A");
                 channel.sendResponse(response);
             });
         TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new);
@@ -159,8 +155,7 @@ public class TransportActionProxyTests extends ESTestCase {
         serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME,
             (request, channel, task) -> {
                 assertEquals(request.sourceNode, "TS_A");
-                SimpleTestResponse response = new SimpleTestResponse();
-                response.targetNode = "TS_B";
+                SimpleTestResponse response = new SimpleTestResponse("TS_B");
                 channel.sendResponse(response);
             });
         TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new);
@@ -175,8 +170,8 @@ public class TransportActionProxyTests extends ESTestCase {
         serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("internal:test"), TransportActionProxy.wrapRequest(nodeC,
             new SimpleTestRequest("TS_A")), new TransportResponseHandler<SimpleTestResponse>() {
                 @Override
-                public SimpleTestResponse newInstance() {
-                    return new SimpleTestResponse();
+                public SimpleTestResponse read(StreamInput in) throws IOException {
+                    return new SimpleTestResponse(in);
                 }
 
                 @Override
@@ -228,11 +223,20 @@ public class TransportActionProxyTests extends ESTestCase {
     }
 
     public static class SimpleTestResponse extends TransportResponse {
-        String targetNode;
+        final String targetNode;
+
+        SimpleTestResponse(String targetNode) {
+            this.targetNode = targetNode;
+        }
+
+        SimpleTestResponse(StreamInput in) throws IOException {
+            super(in);
+            this.targetNode = in.readString();
+        }
+
         @Override
         public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-            targetNode = in.readString();
+            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
         }
 
         @Override
@@ -263,7 +267,7 @@ public class TransportActionProxyTests extends ESTestCase {
     }
 
     public void testIsProxyRequest() {
-        assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>((in) -> null)));
+        assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>(TransportRequest.Empty.INSTANCE, null)));
         assertFalse(TransportActionProxy.isProxyRequest(TransportRequest.Empty.INSTANCE));
     }
 }

+ 5 - 2
test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java

@@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportMessageListener;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportResponse;
+import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportStats;
 
@@ -163,8 +164,10 @@ public class CapturingTransport implements Transport {
     /**
      * simulate a response for the given requestId
      */
-    public void handleResponse(final long requestId, final TransportResponse response) {
-        responseHandlers.onResponseReceived(requestId, listener).handleResponse(response);
+    public <Response extends TransportResponse> void handleResponse(final long requestId, final Response response) {
+        TransportResponseHandler<Response> handler =
+            (TransportResponseHandler<Response>) responseHandlers.onResponseReceived(requestId, listener);
+        handler.handleResponse(response);
     }
 
     /**

+ 86 - 74
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -233,8 +233,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "internal:sayHello",
             new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
                 @Override
-                public StringMessageResponse newInstance() {
-                    return new StringMessageResponse();
+                public StringMessageResponse read(StreamInput in) throws IOException {
+                    return new StringMessageResponse(in);
                 }
 
                 @Override
@@ -264,8 +264,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         res = serviceB.submitRequest(nodeA, "internal:sayHello", new StringMessageRequest("moshe"),
             TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler<StringMessageResponse>() {
                 @Override
-                public StringMessageResponse newInstance() {
-                    return new StringMessageResponse();
+                public StringMessageResponse read(StreamInput in) throws IOException {
+                    return new StringMessageResponse(in);
                 }
 
                 @Override
@@ -312,8 +312,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         final String executor = randomFrom(ThreadPool.THREAD_POOL_TYPES.keySet().toArray(new String[0]));
         TransportResponseHandler<StringMessageResponse> responseHandler = new TransportResponseHandler<StringMessageResponse>() {
             @Override
-            public StringMessageResponse newInstance() {
-                return new StringMessageResponse();
+            public StringMessageResponse read(StreamInput in) throws IOException {
+                return new StringMessageResponse(in);
             }
 
             @Override
@@ -367,8 +367,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         serviceA.sendRequest(nodeA, "internal:localNode", new StringMessageRequest("test"),
             new TransportResponseHandler<StringMessageResponse>() {
             @Override
-            public StringMessageResponse newInstance() {
-                return new StringMessageResponse();
+            public StringMessageResponse read(StreamInput in) throws IOException {
+                return new StringMessageResponse(in);
             }
 
             @Override
@@ -516,7 +516,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(),
             new TransportResponseHandler<TransportResponse.Empty>() {
                 @Override
-                public TransportResponse.Empty newInstance() {
+                public TransportResponse.Empty read(StreamInput in) {
                     return TransportResponse.Empty.INSTANCE;
                 }
 
@@ -564,8 +564,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(),
             new TransportResponseHandler<StringMessageResponse>() {
                 @Override
-                public StringMessageResponse newInstance() {
-                    return new StringMessageResponse();
+                public StringMessageResponse read(StreamInput in) throws IOException {
+                    return new StringMessageResponse(in);
                 }
 
                 @Override
@@ -606,8 +606,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "internal:sayHelloException",
             new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
                 @Override
-                public StringMessageResponse newInstance() {
-                    return new StringMessageResponse();
+                public StringMessageResponse read(StreamInput in) throws IOException {
+                    return new StringMessageResponse(in);
                 }
 
                 @Override
@@ -658,7 +658,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         serviceA.registerRequestHandler("internal:test", TestRequest::new,
             randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC, (request, channel, task) -> {
                 try {
-                    channel.sendResponse(new TestResponse());
+                    channel.sendResponse(new TestResponse((String) null));
                 } catch (Exception e) {
                     logger.info("caught exception while responding", e);
                     responseErrors.add(e);
@@ -666,7 +666,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             });
         final TransportRequestHandler<TestRequest> ignoringRequestHandler = (request, channel, task) -> {
             try {
-                channel.sendResponse(new TestResponse());
+                channel.sendResponse(new TestResponse((String) null));
             } catch (Exception e) {
                 // we don't really care what's going on B, we're testing through A
                 logger.trace("caught exception while responding from node B", e);
@@ -822,8 +822,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(),
             new TransportResponseHandler<StringMessageResponse>() {
                 @Override
-                public StringMessageResponse newInstance() {
-                    return new StringMessageResponse();
+                public StringMessageResponse read(StreamInput in) throws IOException {
+                    return new StringMessageResponse(in);
                 }
 
                 @Override
@@ -886,8 +886,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             new StringMessageRequest("forever"), TransportRequestOptions.builder().withTimeout(100).build(),
             new TransportResponseHandler<StringMessageResponse>() {
                 @Override
-                public StringMessageResponse newInstance() {
-                    return new StringMessageResponse();
+                public StringMessageResponse read(StreamInput in) throws IOException {
+                    return new StringMessageResponse(in);
                 }
 
                 @Override
@@ -924,8 +924,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(),
                 new TransportResponseHandler<StringMessageResponse>() {
                     @Override
-                    public StringMessageResponse newInstance() {
-                        return new StringMessageResponse();
+                    public StringMessageResponse read(StreamInput in) throws IOException {
+                        return new StringMessageResponse(in);
                     }
 
                     @Override
@@ -975,8 +975,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         TransportResponseHandler<StringMessageResponse> noopResponseHandler = new TransportResponseHandler<StringMessageResponse>() {
 
             @Override
-            public StringMessageResponse newInstance() {
-                return new StringMessageResponse();
+            public StringMessageResponse read(StreamInput in) throws IOException {
+                return new StringMessageResponse(in);
             }
 
             @Override
@@ -1174,19 +1174,19 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
     static class StringMessageResponse extends TransportResponse {
 
-        private String message;
+        private final String message;
 
         StringMessageResponse(String message) {
             this.message = message;
         }
 
-        StringMessageResponse() {
+        StringMessageResponse(StreamInput in) throws IOException {
+            this.message = in.readString();
         }
 
         @Override
         public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-            message = in.readString();
+            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
         }
 
         @Override
@@ -1238,12 +1238,19 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
     static class Version0Response extends TransportResponse {
 
-        int value1;
+        final int value1;
+
+        Version0Response(int value1) {
+            this.value1 = value1;
+        }
+
+        Version0Response(StreamInput in) throws IOException {
+            this.value1 = in.readInt();
+        }
 
         @Override
         public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-            value1 = in.readInt();
+            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
         }
 
         @Override
@@ -1255,16 +1262,27 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
     static class Version1Response extends Version0Response {
 
-        int value2;
+        final int value2;
 
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
+        Version1Response(int value1, int value2) {
+            super(value1);
+            this.value2 = value2;
+        }
+
+        Version1Response(StreamInput in) throws IOException {
+            super(in);
             if (in.getVersion().onOrAfter(version1)) {
                 value2 = in.readInt();
+            } else {
+                value2 = 0;
             }
         }
 
+        @Override
+        public void readFrom(StreamInput in) throws IOException {
+            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
@@ -1281,9 +1299,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 public void messageReceived(Version1Request request, TransportChannel channel, Task task) throws Exception {
                     assertThat(request.value1, equalTo(1));
                     assertThat(request.value2, equalTo(0)); // not set, coming from service A
-                    Version1Response response = new Version1Response();
-                    response.value1 = 1;
-                    response.value2 = 2;
+                    Version1Response response = new Version1Response(1, 2);
                     channel.sendResponse(response);
                     assertEquals(version0, channel.getVersion());
                 }
@@ -1294,8 +1310,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         Version0Response version0Response = serviceA.submitRequest(nodeB, "internal:version", version0Request,
             new TransportResponseHandler<Version0Response>() {
                 @Override
-                public Version0Response newInstance() {
-                    return new Version0Response();
+                public Version0Response read(StreamInput in) throws IOException {
+                    return new Version0Response(in);
                 }
 
                 @Override
@@ -1324,8 +1340,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 @Override
                 public void messageReceived(Version0Request request, TransportChannel channel, Task task) throws Exception {
                     assertThat(request.value1, equalTo(1));
-                    Version0Response response = new Version0Response();
-                    response.value1 = 1;
+                    Version0Response response = new Version0Response(1);
                     channel.sendResponse(response);
                     assertEquals(version0, channel.getVersion());
                 }
@@ -1337,8 +1352,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         Version1Response version1Response = serviceB.submitRequest(nodeA, "internal:version", version1Request,
             new TransportResponseHandler<Version1Response>() {
                 @Override
-                public Version1Response newInstance() {
-                    return new Version1Response();
+                public Version1Response read(StreamInput in) throws IOException {
+                    return new Version1Response(in);
                 }
 
                 @Override
@@ -1368,9 +1383,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             (request, channel, task) -> {
                 assertThat(request.value1, equalTo(1));
                 assertThat(request.value2, equalTo(2));
-                Version1Response response = new Version1Response();
-                response.value1 = 1;
-                response.value2 = 2;
+                Version1Response response = new Version1Response(1, 2);
                 channel.sendResponse(response);
                 assertEquals(version1, channel.getVersion());
             });
@@ -1381,8 +1394,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         Version1Response version1Response = serviceB.submitRequest(nodeB, "internal:version", version1Request,
             new TransportResponseHandler<Version1Response>() {
                 @Override
-                public Version1Response newInstance() {
-                    return new Version1Response();
+                public Version1Response read(StreamInput in) throws IOException {
+                    return new Version1Response(in);
                 }
 
                 @Override
@@ -1411,8 +1424,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         serviceA.registerRequestHandler("internal:version", Version0Request::new, ThreadPool.Names.SAME,
             (request, channel, task) -> {
                 assertThat(request.value1, equalTo(1));
-                Version0Response response = new Version0Response();
-                response.value1 = 1;
+                Version0Response response = new Version0Response(1);
                 channel.sendResponse(response);
                 assertEquals(version0, channel.getVersion());
             });
@@ -1422,8 +1434,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         Version0Response version0Response = serviceA.submitRequest(nodeA, "internal:version", version0Request,
             new TransportResponseHandler<Version0Response>() {
                 @Override
-                public Version0Response newInstance() {
-                    return new Version0Response();
+                public Version0Response read(StreamInput in) throws IOException {
+                    return new Version0Response(in);
                 }
 
                 @Override
@@ -1458,8 +1470,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "internal:sayHello",
             new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
                 @Override
-                public StringMessageResponse newInstance() {
-                    return new StringMessageResponse();
+                public StringMessageResponse read(StreamInput in) throws IOException {
+                    return new StringMessageResponse(in);
                 }
 
                 @Override
@@ -1516,8 +1528,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(),
             new TransportResponseHandler<StringMessageResponse>() {
                 @Override
-                public StringMessageResponse newInstance() {
-                    return new StringMessageResponse();
+                public StringMessageResponse read(StreamInput in) throws IOException {
+                    return new StringMessageResponse(in);
                 }
 
                 @Override
@@ -1561,13 +1573,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         final AtomicReference<TransportAddress> addressB = new AtomicReference<>();
         serviceB.registerRequestHandler("internal:action1", TestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> {
             addressA.set(request.remoteAddress());
-            channel.sendResponse(new TestResponse());
+            channel.sendResponse(new TestResponse((String) null));
             latch.countDown();
         });
         serviceA.sendRequest(nodeB, "internal:action1", new TestRequest(), new TransportResponseHandler<TestResponse>() {
             @Override
-            public TestResponse newInstance() {
-                return new TestResponse();
+            public TestResponse read(StreamInput in) throws IOException {
+                return new TestResponse(in);
             }
 
             @Override
@@ -1614,8 +1626,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 serviceA.sendRequest(connection, "internal:action", new TestRequest(), TransportRequestOptions.EMPTY,
                     new TransportResponseHandler<TestResponse>() {
                         @Override
-                        public TestResponse newInstance() {
-                            return new TestResponse();
+                        public TestResponse read(StreamInput in) throws IOException {
+                            return new TestResponse(in);
                         }
 
                         @Override
@@ -1680,9 +1692,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
     private static class TestResponse extends TransportResponse {
 
-        String info;
+        final String info;
 
-        TestResponse() {
+        TestResponse(StreamInput in) throws IOException {
+            this.info = in.readOptionalString();
         }
 
         TestResponse(String info) {
@@ -1691,8 +1704,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
         @Override
         public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-            info = in.readOptionalString();
+            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
         }
 
         @Override
@@ -1777,8 +1789,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                         TransportRequestOptions.builder().withCompress(randomBoolean()).build(),
                         new TransportResponseHandler<TestResponse>() {
                             @Override
-                            public TestResponse newInstance() {
-                                return new TestResponse();
+                            public TestResponse read(StreamInput in) throws IOException {
+                                return new TestResponse(in);
                             }
 
                             @Override
@@ -1834,8 +1846,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             }
 
             @Override
-            public TestResponse newInstance() {
-                return new TestResponse();
+            public TestResponse read(StreamInput in) throws IOException {
+                return new TestResponse(in);
             }
 
             @Override
@@ -2100,7 +2112,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
         TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
             @Override
-            public TransportResponse newInstance() {
+            public TransportResponse read(StreamInput in) {
                 return TransportResponse.Empty.INSTANCE;
             }
 
@@ -2154,7 +2166,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         CountDownLatch latch = new CountDownLatch(1);
         TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
             @Override
-            public TransportResponse newInstance() {
+            public TransportResponse read(StreamInput in) {
                 return TransportResponse.Empty.INSTANCE;
             }
 
@@ -2231,7 +2243,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         CountDownLatch responseLatch = new CountDownLatch(1);
         TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
             @Override
-            public TransportResponse newInstance() {
+            public TransportResponse read(StreamInput in) {
                 return TransportResponse.Empty.INSTANCE;
             }
 
@@ -2299,7 +2311,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         CountDownLatch responseLatch = new CountDownLatch(1);
         TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
             @Override
-            public TransportResponse newInstance() {
+            public TransportResponse read(StreamInput in) {
                 return TransportResponse.Empty.INSTANCE;
             }
 
@@ -2413,7 +2425,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         AtomicReference<TransportException> receivedException = new AtomicReference<>(null);
         TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
             @Override
-            public TransportResponse newInstance() {
+            public TransportResponse read(StreamInput in) {
                 return TransportResponse.Empty.INSTANCE;
             }
 

+ 3 - 2
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java

@@ -10,6 +10,7 @@ import org.elasticsearch.action.support.DestructiveOperations;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -335,7 +336,7 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                     threadContext.wrapRestorable(storedContext), new TransportResponseHandler<Empty>() {
 
                 @Override
-                public Empty newInstance() {
+                public Empty read(StreamInput in) {
                     return Empty.INSTANCE;
                 }
 
@@ -374,7 +375,7 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                         new TransportResponseHandler<Empty>() {
 
                             @Override
-                            public Empty newInstance() {
+                            public Empty read(StreamInput in) {
                                 return Empty.INSTANCE;
                             }
 

+ 7 - 2
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java

@@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.network.NetworkAddress;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
@@ -176,8 +177,12 @@ public class ServerTransportFilterIntegrationTests extends SecurityIntegTestCase
                         TransportRequestOptions.EMPTY,
                         new TransportResponseHandler<TransportResponse>() {
                     @Override
-                    public TransportResponse newInstance() {
-                        fail("never get that far");
+                    public TransportResponse read(StreamInput in) {
+                        try {
+                            fail("never get that far");
+                        } finally {
+                            latch.countDown();
+                        }
                         return null;
                     }