Browse Source

Move compression config to ConnectionProfile (#35357)

This is related to #34483. It introduces a namespaced setting for
compression that allows users to configure compression on a per remote
cluster basis. The transport.tcp.compress remains as a fallback
setting. If transport.tcp.compress is set to true, then all requests
and responses are compressed. If it is set to false, only requests to
clusters based on the cluster.remote.cluster_name.transport.compress
setting are compressed. However, after this change regardless of any
local settings, responses will be compressed if the request that is
received was compressed.
Tim Brooks 7 years ago
parent
commit
93c2c604e5
29 changed files with 372 additions and 303 deletions
  1. 2 3
      modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java
  2. 1 2
      modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java
  3. 4 3
      modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java
  4. 4 3
      plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java
  5. 92 40
      server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java
  6. 12 6
      server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
  7. 6 0
      server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
  8. 0 6
      server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java
  9. 24 20
      server/src/main/java/org/elasticsearch/transport/TcpTransport.java
  10. 11 7
      server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java
  11. 0 2
      server/src/main/java/org/elasticsearch/transport/TransportChannel.java
  12. 1 2
      server/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java
  13. 1 6
      server/src/main/java/org/elasticsearch/transport/TransportService.java
  14. 0 5
      server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java
  15. 0 6
      server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
  16. 0 7
      server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java
  17. 0 6
      server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java
  18. 1 69
      server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java
  19. 94 3
      server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java
  20. 11 5
      server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java
  21. 88 80
      test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java
  22. 1 0
      test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java
  23. 1 0
      test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java
  24. 3 3
      test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java
  25. 4 3
      test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java
  26. 3 9
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java
  27. 1 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java
  28. 3 3
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java
  29. 4 3
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java

+ 2 - 3
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

@@ -169,9 +169,8 @@ public class Netty4Transport extends TcpTransport {
     private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoopGroup eventLoopGroup) {
         String name = profileSettings.profileName;
         if (logger.isDebugEnabled()) {
-            logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
-                    + "receive_predictor[{}->{}]",
-                name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress,
+            logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], receive_predictor[{}->{}]",
+                name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts,
                 receivePredictorMin, receivePredictorMax);
         }
 

+ 1 - 2
modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java

@@ -38,7 +38,6 @@ import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportResponseHandler;
-import org.elasticsearch.transport.TransportResponseOptions;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
@@ -91,7 +90,7 @@ public class Netty4ScheduledPingTests extends ESTestCase {
         serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
             (request, channel, task) -> {
                 try {
-                    channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
+                    channel.sendResponse(TransportResponse.Empty.INSTANCE);
                 } catch (IOException e) {
                     logger.error("Unexpected failure", e);
                     fail(e.getMessage());

+ 4 - 3
modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

@@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.node.Node;
@@ -36,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
 import org.elasticsearch.transport.BindTransportException;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.TcpChannel;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.Transport;
@@ -58,9 +58,10 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
             BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
 
             @Override
-            public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
+            public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
+                                         ActionListener<Version> listener) {
                 if (doHandshake) {
-                    super.executeHandshake(node, channel, timeout, listener);
+                    super.executeHandshake(node, channel, profile, listener);
                 } else {
                     listener.onResponse(version.minimumCompatibilityVersion());
                 }

+ 4 - 3
plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java

@@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.MockPageCacheRecycler;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@@ -37,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
 import org.elasticsearch.transport.BindTransportException;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.TcpChannel;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.Transport;
@@ -62,9 +62,10 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
             new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) {
 
             @Override
-            public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
+            public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
+                                         ActionListener<Version> listener) {
                 if (doHandshake) {
-                    super.executeHandshake(node, channel, timeout, listener);
+                    super.executeHandshake(node, channel, profile, listener);
                 } else {
                     listener.onResponse(version.minimumCompatibilityVersion());
                 }

+ 92 - 40
server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java

@@ -38,40 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public final class ConnectionProfile {
 
-    /**
-     * Builds a connection profile that is dedicated to a single channel type. Use this
-     * when opening single use connections
-     */
-    public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType,
-                                                              @Nullable TimeValue connectTimeout,
-                                                              @Nullable TimeValue handshakeTimeout) {
-        Builder builder = new Builder();
-        builder.addConnections(1, channelType);
-        final EnumSet<TransportRequestOptions.Type> otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class);
-        otherTypes.remove(channelType);
-        builder.addConnections(0, otherTypes.stream().toArray(TransportRequestOptions.Type[]::new));
-        if (connectTimeout != null) {
-            builder.setConnectTimeout(connectTimeout);
-        }
-        if (handshakeTimeout != null) {
-            builder.setHandshakeTimeout(handshakeTimeout);
-        }
-        return builder.build();
-    }
-
-    private final List<ConnectionTypeHandle> handles;
-    private final int numConnections;
-    private final TimeValue connectTimeout;
-    private final TimeValue handshakeTimeout;
-
-    private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout,
-                              TimeValue handshakeTimeout) {
-        this.handles = handles;
-        this.numConnections = numConnections;
-        this.connectTimeout = connectTimeout;
-        this.handshakeTimeout = handshakeTimeout;
-    }
-
     /**
      * takes a {@link ConnectionProfile} resolves it to a fully specified (i.e., no nulls) profile
      */
@@ -79,7 +45,8 @@ public final class ConnectionProfile {
         Objects.requireNonNull(fallbackProfile);
         if (profile == null) {
             return fallbackProfile;
-        } else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null) {
+        } else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null
+            && profile.getCompressionEnabled() != null) {
             return profile;
         } else {
             ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile);
@@ -89,6 +56,9 @@ public final class ConnectionProfile {
             if (profile.getHandshakeTimeout() == null) {
                 builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout());
             }
+            if (profile.getCompressionEnabled() == null) {
+                builder.setCompressionEnabled(fallbackProfile.getCompressionEnabled());
+            }
             return builder.build();
         }
     }
@@ -108,6 +78,7 @@ public final class ConnectionProfile {
         Builder builder = new Builder();
         builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
         builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
+        builder.setCompressionEnabled(Transport.TRANSPORT_TCP_COMPRESS.get(settings));
         builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
         builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
         // if we are not master eligible we don't need a dedicated channel to publish the state
@@ -118,13 +89,77 @@ public final class ConnectionProfile {
         return builder.build();
     }
 
+    /**
+     * Builds a connection profile that is dedicated to a single channel type. Use this
+     * when opening single use connections
+     */
+    public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType) {
+        return buildSingleChannelProfile(channelType, null, null, null);
+    }
+
+    /**
+     * Builds a connection profile that is dedicated to a single channel type. Allows passing compression
+     * settings.
+     */
+    public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, boolean compressionEnabled) {
+        return buildSingleChannelProfile(channelType, null, null, compressionEnabled);
+    }
+
+    /**
+     * Builds a connection profile that is dedicated to a single channel type. Allows passing connection and
+     * handshake timeouts.
+     */
+    public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout,
+                                                              @Nullable TimeValue handshakeTimeout) {
+        return buildSingleChannelProfile(channelType, connectTimeout, handshakeTimeout, null);
+    }
+
+    /**
+     * Builds a connection profile that is dedicated to a single channel type. Allows passing connection and
+     * handshake timeouts and compression settings.
+     */
+    public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout,
+                                                              @Nullable TimeValue handshakeTimeout, @Nullable Boolean compressionEnabled) {
+        Builder builder = new Builder();
+        builder.addConnections(1, channelType);
+        final EnumSet<TransportRequestOptions.Type> otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class);
+        otherTypes.remove(channelType);
+        builder.addConnections(0, otherTypes.toArray(new TransportRequestOptions.Type[0]));
+        if (connectTimeout != null) {
+            builder.setConnectTimeout(connectTimeout);
+        }
+        if (handshakeTimeout != null) {
+            builder.setHandshakeTimeout(handshakeTimeout);
+        }
+        if (compressionEnabled != null) {
+            builder.setCompressionEnabled(compressionEnabled);
+        }
+        return builder.build();
+    }
+
+    private final List<ConnectionTypeHandle> handles;
+    private final int numConnections;
+    private final TimeValue connectTimeout;
+    private final TimeValue handshakeTimeout;
+    private final Boolean compressionEnabled;
+
+    private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout,
+                              TimeValue handshakeTimeout, Boolean compressionEnabled) {
+        this.handles = handles;
+        this.numConnections = numConnections;
+        this.connectTimeout = connectTimeout;
+        this.handshakeTimeout = handshakeTimeout;
+        this.compressionEnabled = compressionEnabled;
+    }
+
     /**
      * A builder to build a new {@link ConnectionProfile}
      */
     public static class Builder {
         private final List<ConnectionTypeHandle> handles = new ArrayList<>();
         private final Set<TransportRequestOptions.Type> addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class);
-        private int offset = 0;
+        private int numConnections = 0;
+        private Boolean compressionEnabled;
         private TimeValue connectTimeout;
         private TimeValue handshakeTimeout;
 
@@ -135,10 +170,11 @@ public final class ConnectionProfile {
         /** copy constructor, using another profile as a base */
         public Builder(ConnectionProfile source) {
             handles.addAll(source.getHandles());
-            offset = source.getNumConnections();
+            numConnections = source.getNumConnections();
             handles.forEach(th -> addedTypes.addAll(th.types));
             connectTimeout = source.getConnectTimeout();
             handshakeTimeout = source.getHandshakeTimeout();
+            compressionEnabled = source.getCompressionEnabled();
         }
         /**
          * Sets a connect timeout for this connection profile
@@ -160,6 +196,13 @@ public final class ConnectionProfile {
             this.handshakeTimeout = handshakeTimeout;
         }
 
+        /**
+         * Sets compression enabled for this connection profile
+         */
+        public void setCompressionEnabled(boolean compressionEnabled) {
+            this.compressionEnabled = compressionEnabled;
+        }
+
         /**
          * Adds a number of connections for one or more types. Each type can only be added once.
          * @param numConnections the number of connections to use in the pool for the given connection types
@@ -175,8 +218,8 @@ public final class ConnectionProfile {
                 }
             }
             addedTypes.addAll(Arrays.asList(types));
-            handles.add(new ConnectionTypeHandle(offset, numConnections, EnumSet.copyOf(Arrays.asList(types))));
-            offset += numConnections;
+            handles.add(new ConnectionTypeHandle(this.numConnections, numConnections, EnumSet.copyOf(Arrays.asList(types))));
+            this.numConnections += numConnections;
         }
 
         /**
@@ -189,7 +232,8 @@ public final class ConnectionProfile {
             if (types.isEmpty() == false) {
                 throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types);
             }
-            return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout, handshakeTimeout);
+            return new ConnectionProfile(Collections.unmodifiableList(handles), numConnections, connectTimeout, handshakeTimeout,
+                compressionEnabled);
         }
 
     }
@@ -208,6 +252,14 @@ public final class ConnectionProfile {
         return handshakeTimeout;
     }
 
+    /**
+     * Returns boolean indicating if compression is enabled or <code>null</code> if no explicit compression
+     * is set on this profile.
+     */
+    public Boolean getCompressionEnabled() {
+        return compressionEnabled;
+    }
+
     /**
      * Returns the total number of connections for this profile
      */

+ 12 - 6
server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

@@ -66,6 +66,8 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS;
+
 /**
  * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
  * current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not
@@ -86,6 +88,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
     private final ConnectionProfile remoteProfile;
     private final ConnectedNodes connectedNodes;
     private final String clusterAlias;
+    private final boolean compress;
     private final int maxNumRemoteConnections;
     private final Predicate<DiscoveryNode> nodePredicate;
     private final ThreadPool threadPool;
@@ -108,12 +111,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
      * @param proxyAddress the proxy address
      */
     RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
-            TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
-            Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
+                            TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
+                            Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
         this.transportService = transportService;
         this.maxNumRemoteConnections = maxNumRemoteConnections;
         this.nodePredicate = nodePredicate;
         this.clusterAlias = clusterAlias;
+        this.compress = REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
         ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
         builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
         builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
@@ -122,6 +126,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
             TransportRequestOptions.Type.BULK,
             TransportRequestOptions.Type.STATE,
             TransportRequestOptions.Type.RECOVERY);
+        builder.setCompressionEnabled(compress);
         remoteProfile = builder.build();
         connectedNodes = new ConnectedNodes(clusterAlias);
         this.seedNodes = Collections.unmodifiableList(seedNodes);
@@ -471,8 +476,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
             });
         }
 
-        private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
-                                final TransportService transportService, final ConnectionManager manager, ActionListener<Void> listener) {
+        private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, final TransportService transportService,
+                                        final ConnectionManager manager, ActionListener<Void> listener) {
             if (Thread.currentThread().isInterrupted()) {
                 listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
             }
@@ -483,8 +488,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
                         logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode,
                             proxyAddress);
                         final TransportService.HandshakeResponse handshakeResponse;
-                        Transport.Connection connection = manager.openConnection(seedNode,
-                            ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
+                        ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG,
+                            compress);
+                        Transport.Connection connection = manager.openConnection(seedNode, profile);
                         boolean success = false;
                         try {
                             try {

+ 6 - 0
server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

@@ -173,6 +173,12 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
             key -> timeSetting(key, TcpTransport.PING_SCHEDULE, Setting.Property.NodeScope),
             REMOTE_CLUSTERS_SEEDS);
 
+    public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
+        "cluster.remote.",
+        "transport.compress",
+        key -> boolSetting(key, Transport.TRANSPORT_TCP_COMPRESS, Setting.Property.NodeScope),
+        REMOTE_CLUSTERS_SEEDS);
+
     private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
             && (node.isMasterNode() == false  || node.isDataNode() || node.isIngestNode());
 

+ 0 - 6
server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java

@@ -54,12 +54,6 @@ public class TaskTransportChannel implements TransportChannel {
         channel.sendResponse(response);
     }
 
-    @Override
-    public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
-        endTask();
-        channel.sendResponse(response, options);
-    }
-
     @Override
     public void sendResponse(Exception exception) throws IOException {
         endTask();

+ 24 - 20
server/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -164,8 +164,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         key -> intSetting(key, -1, -1, Setting.Property.NodeScope));
 
     // This is the number of bytes necessary to read the message size
-    public static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
-    public static final int PING_DATA_SIZE = -1;
+    private static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
+    private static final int PING_DATA_SIZE = -1;
     protected final CounterMetric successfulPings = new CounterMetric();
     protected final CounterMetric failedPings = new CounterMetric();
     private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
@@ -194,7 +194,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
     // this lock is here to make sure we close this transport and disconnect all the client nodes
     // connections while no connect operations is going on
     private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
-    protected final boolean compress;
+    protected final boolean compressResponses;
     private volatile BoundTransportAddress boundAddress;
     private final String transportName;
 
@@ -218,7 +218,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         this.bigArrays = bigArrays;
         this.circuitBreakerService = circuitBreakerService;
         this.namedWriteableRegistry = namedWriteableRegistry;
-        this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
+        this.compressResponses = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
         this.networkService = networkService;
         this.transportName = transportName;
         this.transportLogger = new TransportLogger();
@@ -284,6 +284,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         private final List<TcpChannel> channels;
         private final DiscoveryNode node;
         private final Version version;
+        private final boolean compress;
         private final AtomicBoolean isClosing = new AtomicBoolean(false);
 
         NodeChannels(DiscoveryNode node, List<TcpChannel> channels, ConnectionProfile connectionProfile, Version handshakeVersion) {
@@ -297,6 +298,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
                     typeMapping.put(type, handle);
             }
             version = handshakeVersion;
+            compress = connectionProfile.getCompressionEnabled();
         }
 
         @Override
@@ -384,6 +386,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
                 throw new NodeNotConnectedException(node, "connection already closed");
             }
             TcpChannel channel = channel(options.type());
+
+            if (compress) {
+                options = TransportRequestOptions.builder(options).withCompress(true).build();
+            }
             sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte) 0);
         }
     }
@@ -573,8 +579,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
     }
 
     // package private for tests
-    public static int resolvePublishPort(ProfileSettings profileSettings, List<InetSocketAddress> boundAddresses,
-                                         InetAddress publishInetAddress) {
+    static int resolvePublishPort(ProfileSettings profileSettings, List<InetSocketAddress> boundAddresses,
+                                  InetAddress publishInetAddress) {
         int publishPort = profileSettings.publishPort;
 
         // if port not explicitly provided, search for port of address in boundAddresses that matches publishInetAddress
@@ -811,16 +817,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
      */
     protected abstract void stopInternal();
 
-    public boolean canCompress(TransportRequest request) {
-        return compress && (!(request instanceof BytesTransportRequest));
+    private boolean canCompress(TransportRequest request) {
+        return request instanceof BytesTransportRequest == false;
     }
 
     private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action,
                                       final TransportRequest request, TransportRequestOptions options, Version channelVersion,
                                       byte status) throws IOException, TransportException {
-        if (compress) {
-            options = TransportRequestOptions.builder(options).withCompress(true).build();
-        }
 
         // only compress if asked and the request is not bytes. Otherwise only
         // the header part is compressed, and the "body" can't be extracted as compressed
@@ -935,10 +938,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         final String action,
         TransportResponseOptions options,
         byte status) throws IOException {
-        if (compress) {
+        if (compressResponses && options.compress() == false) {
             options = TransportResponseOptions.builder(options).withCompress(true).build();
         }
-        status = TransportStatus.setResponse(status); // TODO share some code with sendRequest
+
+        status = TransportStatus.setResponse(status);
         ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
         CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, options.compress());
         boolean addedReleaseListener = false;
@@ -1159,7 +1163,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
      */
     public static class HttpOnTransportException extends ElasticsearchException {
 
-        public HttpOnTransportException(String msg) {
+        private HttpOnTransportException(String msg) {
             super(msg);
         }
 
@@ -1346,7 +1350,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
                     getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes);
                 }
                 transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName,
-                    messageLengthBytes);
+                    messageLengthBytes, TransportStatus.isCompress(status));
                 final TransportRequest request = reg.newRequest(stream);
                 request.remoteAddress(new TransportAddress(remoteAddress));
                 // in case we throw an exception, i.e. when the limit is hit, we don't want to verify
@@ -1356,8 +1360,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         } catch (Exception e) {
             // the circuit breaker tripped
             if (transportChannel == null) {
-                transportChannel =
-                    new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName, 0);
+                transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features,
+                    profileName, 0, TransportStatus.isCompress(status));
             }
             try {
                 transportChannel.sendResponse(e);
@@ -1416,8 +1420,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         }
     }
 
-    public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
-        handshaker.sendHandshake(responseHandlers.newRequestId(), node, channel, timeout, listener);
+    public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, ActionListener<Version> listener) {
+        handshaker.sendHandshake(responseHandlers.newRequestId(), node, channel, profile.getHandshakeTimeout(), listener);
     }
 
     final int getNumPendingHandshakes() {
@@ -1629,7 +1633,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
             if (countDown.countDown()) {
                 final TcpChannel handshakeChannel = channels.get(0);
                 try {
-                    executeHandshake(node, handshakeChannel, connectionProfile.getHandshakeTimeout(), new ActionListener<Version>() {
+                    executeHandshake(node, handshakeChannel, connectionProfile, new ActionListener<Version>() {
                         @Override
                         public void onResponse(Version version) {
                             NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version);

+ 11 - 7
server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java

@@ -26,6 +26,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public final class TcpTransportChannel implements TransportChannel {
+
+    private final AtomicBoolean released = new AtomicBoolean();
     private final TcpTransport transport;
     private final Version version;
     private final Set<String> features;
@@ -33,12 +35,12 @@ public final class TcpTransportChannel implements TransportChannel {
     private final long requestId;
     private final String profileName;
     private final long reservedBytes;
-    private final AtomicBoolean released = new AtomicBoolean();
     private final String channelType;
     private final TcpChannel channel;
+    private final boolean compressResponse;
 
     TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action, long requestId, Version version,
-                        Set<String> features, String profileName, long reservedBytes) {
+                        Set<String> features, String profileName, long reservedBytes, boolean compressResponse) {
         this.version = version;
         this.features = features;
         this.channel = channel;
@@ -48,6 +50,7 @@ public final class TcpTransportChannel implements TransportChannel {
         this.profileName = profileName;
         this.reservedBytes = reservedBytes;
         this.channelType = channelType;
+        this.compressResponse = compressResponse;
     }
 
     @Override
@@ -57,12 +60,13 @@ public final class TcpTransportChannel implements TransportChannel {
 
     @Override
     public void sendResponse(TransportResponse response) throws IOException {
-        sendResponse(response, TransportResponseOptions.EMPTY);
-    }
-
-    @Override
-    public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
         try {
+            TransportResponseOptions options;
+            if (compressResponse) {
+                options = TransportResponseOptions.builder().withCompress(true).build();
+            } else {
+                options = TransportResponseOptions.EMPTY;
+            }
             transport.sendResponse(version, features, channel, response, requestId, action, options);
         } finally {
             release(false);

+ 0 - 2
server/src/main/java/org/elasticsearch/transport/TransportChannel.java

@@ -34,8 +34,6 @@ public interface TransportChannel {
 
     void sendResponse(TransportResponse response) throws IOException;
 
-    void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException;
-
     void sendResponse(Exception exception) throws IOException;
 
     /**

+ 1 - 2
server/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java

@@ -38,8 +38,7 @@ public class TransportResponseOptions {
     }
 
     public static Builder builder(TransportResponseOptions options) {
-        return new Builder()
-                .withCompress(options.compress);
+        return new Builder().withCompress(options.compress);
     }
 
     public static class Builder {

+ 1 - 6
server/src/main/java/org/elasticsearch/transport/TransportService.java

@@ -1169,12 +1169,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
 
         @Override
         public void sendResponse(TransportResponse response) throws IOException {
-            sendResponse(response, TransportResponseOptions.EMPTY);
-        }
-
-        @Override
-        public void sendResponse(final TransportResponse response, TransportResponseOptions options) throws IOException {
-            service.onResponseSent(requestId, action, response, options);
+            service.onResponseSent(requestId, action, response, TransportResponseOptions.EMPTY);
             final TransportResponseHandler handler = service.responseHandlers.onResponseReceived(requestId, service);
             // ignore if its null, the service logs it
             if (handler != null) {

+ 0 - 5
server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java

@@ -58,7 +58,6 @@ import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportResponse;
-import org.elasticsearch.transport.TransportResponseOptions;
 import org.elasticsearch.transport.TransportService;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -487,10 +486,6 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
             capturedResponse = response;
         }
 
-        @Override
-        public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
-        }
-
         @Override
         public void sendResponse(Exception exception) throws IOException {
         }

+ 0 - 6
server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -80,7 +80,6 @@ import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportResponse;
-import org.elasticsearch.transport.TransportResponseOptions;
 import org.elasticsearch.transport.TransportService;
 import org.hamcrest.Matcher;
 import org.junit.After;
@@ -1248,11 +1247,6 @@ public class TransportReplicationActionTests extends ESTestCase {
                 listener.onResponse(((TestResponse) response));
             }
 
-            @Override
-            public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
-                listener.onResponse(((TestResponse) response));
-            }
-
             @Override
             public void sendResponse(Exception exception) throws IOException {
                 listener.onFailure(exception);

+ 0 - 7
server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java

@@ -55,7 +55,6 @@ import org.elasticsearch.transport.BytesTransportRequest;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportConnectionListener;
 import org.elasticsearch.transport.TransportResponse;
-import org.elasticsearch.transport.TransportResponseOptions;
 import org.elasticsearch.transport.TransportService;
 import org.junit.After;
 import org.junit.Before;
@@ -921,12 +920,6 @@ public class PublishClusterStateActionTests extends ESTestCase {
             assertThat(error.get(), nullValue());
         }
 
-        @Override
-        public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
-            this.response.set(response);
-            assertThat(error.get(), nullValue());
-        }
-
         @Override
         public void sendResponse(Exception exception) throws IOException {
             this.error.set(exception);

+ 0 - 6
server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java

@@ -54,7 +54,6 @@ import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportResponse;
-import org.elasticsearch.transport.TransportResponseOptions;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.Closeable;
@@ -393,11 +392,6 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
                         sendResponse.set(true);
                     }
 
-                    @Override
-                    public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
-
-                    }
-
                     @Override
                     public void sendResponse(Exception exception) throws IOException {
 

+ 1 - 69
server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java

@@ -35,7 +35,6 @@ import java.net.InetAddress;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -55,7 +54,7 @@ public class ConnectionManagerTests extends ESTestCase {
         transport = mock(Transport.class);
         connectionManager = new ConnectionManager(settings, transport, threadPool);
         TimeValue oneSecond = new TimeValue(1000);
-        connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond);
+        connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond, false);
     }
 
     @After
@@ -63,73 +62,6 @@ public class ConnectionManagerTests extends ESTestCase {
         threadPool.shutdown();
     }
 
-    public void testConnectionProfileResolve() {
-        final ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
-        assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile));
-
-        final ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
-        builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.BULK);
-        builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.RECOVERY);
-        builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.REG);
-        builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STATE);
-        builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.PING);
-
-        final boolean connectionTimeoutSet = randomBoolean();
-        if (connectionTimeoutSet) {
-            builder.setConnectTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
-        }
-        final boolean connectionHandshakeSet = randomBoolean();
-        if (connectionHandshakeSet) {
-            builder.setHandshakeTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
-        }
-
-        final ConnectionProfile profile = builder.build();
-        final ConnectionProfile resolved = ConnectionProfile.resolveConnectionProfile(profile, defaultProfile);
-        assertNotEquals(resolved, defaultProfile);
-        assertThat(resolved.getNumConnections(), equalTo(profile.getNumConnections()));
-        assertThat(resolved.getHandles(), equalTo(profile.getHandles()));
-
-        assertThat(resolved.getConnectTimeout(),
-            equalTo(connectionTimeoutSet ? profile.getConnectTimeout() : defaultProfile.getConnectTimeout()));
-        assertThat(resolved.getHandshakeTimeout(),
-            equalTo(connectionHandshakeSet ? profile.getHandshakeTimeout() : defaultProfile.getHandshakeTimeout()));
-    }
-
-    public void testDefaultConnectionProfile() {
-        ConnectionProfile profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
-        assertEquals(13, profile.getNumConnections());
-        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
-        assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
-        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
-        assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
-        assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
-
-        profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
-        assertEquals(12, profile.getNumConnections());
-        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
-        assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
-        assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
-        assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
-        assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
-
-        profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
-        assertEquals(11, profile.getNumConnections());
-        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
-        assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
-        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
-        assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
-        assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
-
-        profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false)
-            .put("node.master", false).build());
-        assertEquals(10, profile.getNumConnections());
-        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
-        assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
-        assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
-        assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
-        assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
-    }
-
     public void testConnectAndDisconnect() {
         AtomicInteger nodeConnectedCount = new AtomicInteger();
         AtomicInteger nodeDisconnectedCount = new AtomicInteger();

+ 94 - 3
server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.transport;
 
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.test.ESTestCase;
 import org.hamcrest.Matchers;
@@ -27,19 +28,26 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 
+import static org.hamcrest.Matchers.equalTo;
+
 public class ConnectionProfileTests extends ESTestCase {
 
     public void testBuildConnectionProfile() {
         ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
         TimeValue connectTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10));
-        TimeValue handshaketTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10));
+        TimeValue handshakeTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10));
+        boolean compressionEnabled = randomBoolean();
         final boolean setConnectTimeout = randomBoolean();
         if (setConnectTimeout) {
             builder.setConnectTimeout(connectTimeout);
         }
         final boolean setHandshakeTimeout = randomBoolean();
         if (setHandshakeTimeout) {
-            builder.setHandshakeTimeout(handshaketTimeout);
+            builder.setHandshakeTimeout(handshakeTimeout);
+        }
+        final boolean setCompress = randomBoolean();
+        if (setCompress) {
+            builder.setCompressionEnabled(compressionEnabled);
         }
         builder.addConnections(1, TransportRequestOptions.Type.BULK);
         builder.addConnections(2, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY);
@@ -63,11 +71,17 @@ public class ConnectionProfileTests extends ESTestCase {
         }
 
         if (setHandshakeTimeout) {
-            assertEquals(handshaketTimeout, build.getHandshakeTimeout());
+            assertEquals(handshakeTimeout, build.getHandshakeTimeout());
         } else {
             assertNull(build.getHandshakeTimeout());
         }
 
+        if (setCompress) {
+            assertEquals(compressionEnabled, build.getCompressionEnabled());
+        } else {
+            assertNull(build.getCompressionEnabled());
+        }
+
         List<Integer> list = new ArrayList<>(10);
         for (int i = 0; i < 10; i++) {
             list.add(i);
@@ -126,4 +140,81 @@ public class ConnectionProfileTests extends ESTestCase {
         assertEquals(Integer.valueOf(0), build.getHandles().get(0).getChannel(array));
         expectThrows(IllegalStateException.class, () -> build.getHandles().get(1).getChannel(array));
     }
+
+    public void testConnectionProfileResolve() {
+        final ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
+        assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile));
+
+        final ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
+        builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.BULK);
+        builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.RECOVERY);
+        builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.REG);
+        builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STATE);
+        builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.PING);
+
+        final boolean connectionTimeoutSet = randomBoolean();
+        if (connectionTimeoutSet) {
+            builder.setConnectTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
+        }
+        final boolean connectionHandshakeSet = randomBoolean();
+        if (connectionHandshakeSet) {
+            builder.setHandshakeTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
+        }
+
+        final boolean connectionCompressSet = randomBoolean();
+        if (connectionCompressSet) {
+            builder.setCompressionEnabled(randomBoolean());
+        }
+
+        final ConnectionProfile profile = builder.build();
+        final ConnectionProfile resolved = ConnectionProfile.resolveConnectionProfile(profile, defaultProfile);
+        assertNotEquals(resolved, defaultProfile);
+        assertThat(resolved.getNumConnections(), equalTo(profile.getNumConnections()));
+        assertThat(resolved.getHandles(), equalTo(profile.getHandles()));
+
+        assertThat(resolved.getConnectTimeout(),
+            equalTo(connectionTimeoutSet ? profile.getConnectTimeout() : defaultProfile.getConnectTimeout()));
+        assertThat(resolved.getHandshakeTimeout(),
+            equalTo(connectionHandshakeSet ? profile.getHandshakeTimeout() : defaultProfile.getHandshakeTimeout()));
+        assertThat(resolved.getCompressionEnabled(),
+            equalTo(connectionCompressSet ? profile.getCompressionEnabled() : defaultProfile.getCompressionEnabled()));
+    }
+
+    public void testDefaultConnectionProfile() {
+        ConnectionProfile profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
+        assertEquals(13, profile.getNumConnections());
+        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
+        assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
+        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
+        assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
+        assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
+        assertEquals(TransportService.TCP_CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getConnectTimeout());
+        assertEquals(TransportService.TCP_CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getHandshakeTimeout());
+        assertEquals(Transport.TRANSPORT_TCP_COMPRESS.get(Settings.EMPTY), profile.getCompressionEnabled());
+
+        profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
+        assertEquals(12, profile.getNumConnections());
+        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
+        assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
+        assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
+        assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
+        assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
+
+        profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
+        assertEquals(11, profile.getNumConnections());
+        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
+        assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
+        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
+        assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
+        assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
+
+        profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false)
+            .put("node.master", false).build());
+        assertEquals(10, profile.getNumConnections());
+        assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
+        assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
+        assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
+        assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
+        assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
+    }
 }

+ 11 - 5
server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java

@@ -187,8 +187,7 @@ public class TcpTransportTests extends ESTestCase {
         ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName());
         AtomicReference<BytesReference> messageCaptor = new AtomicReference<>();
         try {
-            TcpTransport transport = new TcpTransport(
-                "test", Settings.builder().put("transport.tcp.compress", compressed).build(), Version.CURRENT, threadPool,
+            TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool,
                 new BigArrays(new PageCacheRecycler(Settings.EMPTY), null), null, null, null) {
 
                 @Override
@@ -207,17 +206,24 @@ public class TcpTransportTests extends ESTestCase {
 
                 @Override
                 public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
-                    int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections();
+                    assertTrue(connectionProfile.getCompressionEnabled());
+                    int numConnections = connectionProfile.getNumConnections();
                     ArrayList<TcpChannel> fakeChannels = new ArrayList<>(numConnections);
                     for (int i = 0; i < numConnections; ++i) {
                         fakeChannels.add(new FakeChannel(messageCaptor));
                     }
-                    return new NodeChannels(node, fakeChannels, MockTcpTransport.LIGHT_PROFILE, Version.CURRENT);
+                    return new NodeChannels(node, fakeChannels, connectionProfile, Version.CURRENT);
                 }
             };
 
             DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
-            Transport.Connection connection = transport.openConnection(node, null);
+            ConnectionProfile.Builder profileBuilder = new ConnectionProfile.Builder(MockTcpTransport.LIGHT_PROFILE);
+            if (compressed) {
+                profileBuilder.setCompressionEnabled(true);
+            } else {
+                profileBuilder.setCompressionEnabled(false);
+            }
+            Transport.Connection connection = transport.openConnection(node, profileBuilder.build());
             connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY);
 
             BytesReference reference = messageCaptor.get();

+ 88 - 80
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -499,106 +499,117 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
     }
 
     public void testVoidMessageCompressed() {
-        serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
-            (request, channel, task) -> {
-                try {
-                    TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
-                    channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions);
-                } catch (IOException e) {
-                    logger.error("Unexpected failure", e);
-                    fail(e.getMessage());
-                }
-            });
+        try (MockTransportService serviceC = build(Settings.EMPTY, CURRENT_VERSION, null, true)) {
+            serviceC.start();
+            serviceC.acceptIncomingRequests();
 
-        TransportFuture<TransportResponse.Empty> res = serviceB.submitRequest(nodeA, "internal:sayHello",
-            TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(),
-            new TransportResponseHandler<TransportResponse.Empty>() {
-                @Override
-                public TransportResponse.Empty read(StreamInput in) {
-                    return TransportResponse.Empty.INSTANCE;
-                }
+            serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
+                (request, channel, task) -> {
+                    try {
+                        channel.sendResponse(TransportResponse.Empty.INSTANCE);
+                    } catch (IOException e) {
+                        logger.error("Unexpected failure", e);
+                        fail(e.getMessage());
+                    }
+                });
 
-                @Override
-                public String executor() {
-                    return ThreadPool.Names.GENERIC;
-                }
+            Settings settingsWithCompress = Settings.builder().put(Transport.TRANSPORT_TCP_COMPRESS.getKey(), true).build();
+            ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
+            serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile);
 
-                @Override
-                public void handleResponse(TransportResponse.Empty response) {
-                }
 
-                @Override
-                public void handleException(TransportException exp) {
-                    logger.error("Unexpected failure", exp);
-                    fail("got exception instead of a response: " + exp.getMessage());
-                }
-            });
+            TransportFuture<TransportResponse.Empty> res = serviceC.submitRequest(nodeA, "internal:sayHello",
+                TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(),
+                new TransportResponseHandler<TransportResponse.Empty>() {
+                    @Override
+                    public TransportResponse.Empty read(StreamInput in) {
+                        return TransportResponse.Empty.INSTANCE;
+                    }
 
-        try {
-            TransportResponse.Empty message = res.get();
-            assertThat(message, notNullValue());
-        } catch (Exception e) {
-            assertThat(e.getMessage(), false, equalTo(true));
+                    @Override
+                    public String executor() {
+                        return ThreadPool.Names.GENERIC;
+                    }
+
+                    @Override
+                    public void handleResponse(TransportResponse.Empty response) {
+                    }
+
+                    @Override
+                    public void handleException(TransportException exp) {
+                        logger.error("Unexpected failure", exp);
+                        fail("got exception instead of a response: " + exp.getMessage());
+                    }
+                });
+
+            try {
+                TransportResponse.Empty message = res.get();
+                assertThat(message, notNullValue());
+            } catch (Exception e) {
+                assertThat(e.getMessage(), false, equalTo(true));
+            }
         }
     }
 
-    public void testHelloWorldCompressed() {
-        serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
-            new TransportRequestHandler<StringMessageRequest>() {
-                @Override
-                public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) {
+    public void testHelloWorldCompressed() throws IOException {
+        try (MockTransportService serviceC = build(Settings.EMPTY, CURRENT_VERSION, null, true)) {
+            serviceC.start();
+            serviceC.acceptIncomingRequests();
+
+            serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
+                (request, channel, task) -> {
                     assertThat("moshe", equalTo(request.message));
                     try {
-                        TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
-                        channel.sendResponse(new StringMessageResponse("hello " + request.message), responseOptions);
+                        channel.sendResponse(new StringMessageResponse("hello " + request.message));
                     } catch (IOException e) {
                         logger.error("Unexpected failure", e);
                         fail(e.getMessage());
                     }
-                }
-            });
+                });
 
-        TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "internal:sayHello",
-            new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(),
-            new TransportResponseHandler<StringMessageResponse>() {
-                @Override
-                public StringMessageResponse read(StreamInput in) throws IOException {
-                    return new StringMessageResponse(in);
-                }
+            Settings settingsWithCompress = Settings.builder().put(Transport.TRANSPORT_TCP_COMPRESS.getKey(), true).build();
+            ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
+            serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile);
 
-                @Override
-                public String executor() {
-                    return ThreadPool.Names.GENERIC;
-                }
+            TransportFuture<StringMessageResponse> res = serviceC.submitRequest(nodeA, "internal:sayHello",
+                new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(),
+                new TransportResponseHandler<StringMessageResponse>() {
+                    @Override
+                    public StringMessageResponse read(StreamInput in) throws IOException {
+                        return new StringMessageResponse(in);
+                    }
 
-                @Override
-                public void handleResponse(StringMessageResponse response) {
-                    assertThat("hello moshe", equalTo(response.message));
-                }
+                    @Override
+                    public String executor() {
+                        return ThreadPool.Names.GENERIC;
+                    }
 
-                @Override
-                public void handleException(TransportException exp) {
-                    logger.error("Unexpected failure", exp);
-                    fail("got exception instead of a response: " + exp.getMessage());
-                }
-            });
+                    @Override
+                    public void handleResponse(StringMessageResponse response) {
+                        assertThat("hello moshe", equalTo(response.message));
+                    }
 
-        try {
-            StringMessageResponse message = res.get();
-            assertThat("hello moshe", equalTo(message.message));
-        } catch (Exception e) {
-            assertThat(e.getMessage(), false, equalTo(true));
+                    @Override
+                    public void handleException(TransportException exp) {
+                        logger.error("Unexpected failure", exp);
+                        fail("got exception instead of a response: " + exp.getMessage());
+                    }
+                });
+
+            try {
+                StringMessageResponse message = res.get();
+                assertThat("hello moshe", equalTo(message.message));
+            } catch (Exception e) {
+                assertThat(e.getMessage(), false, equalTo(true));
+            }
         }
     }
 
     public void testErrorMessage() {
         serviceA.registerRequestHandler("internal:sayHelloException", StringMessageRequest::new, ThreadPool.Names.GENERIC,
-            new TransportRequestHandler<StringMessageRequest>() {
-                @Override
-                public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) throws Exception {
-                    assertThat("moshe", equalTo(request.message));
-                    throw new RuntimeException("bad message !!!");
-                }
+            (request, channel, task) -> {
+                assertThat("moshe", equalTo(request.message));
+                throw new RuntimeException("bad message !!!");
             });
 
         TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "internal:sayHelloException",
@@ -2028,10 +2039,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
              TcpTransport.NodeChannels connection = originalTransport.openConnection(
                  new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
                  connectionProfile)) {
-            PlainActionFuture<Version> listener = PlainActionFuture.newFuture();
-            originalTransport.executeHandshake(connection.getNode(), connection.channel(TransportRequestOptions.Type.PING),
-                TimeValue.timeValueSeconds(10), listener);
-            assertEquals(listener.actionGet(), Version.CURRENT);
+            assertEquals(connection.getVersion(), Version.CURRENT);
         }
     }
 

+ 1 - 0
test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

@@ -211,6 +211,7 @@ public class MockTcpTransport extends TcpTransport {
         }
         builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout());
         builder.setConnectTimeout(connectionProfile.getConnectTimeout());
+        builder.setCompressionEnabled(connectionProfile.getCompressionEnabled());
         return builder.build();
     }
 

+ 1 - 0
test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java

@@ -154,6 +154,7 @@ public class MockNioTransport extends TcpTransport {
         }
         builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout());
         builder.setConnectTimeout(connectionProfile.getConnectTimeout());
+        builder.setCompressionEnabled(connectionProfile.getCompressionEnabled());
         return builder.build();
     }
 

+ 3 - 3
test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java

@@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.test.transport.MockTransportService;
@@ -41,9 +40,10 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
             new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version) {
 
             @Override
-            public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
+            public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
+                                         ActionListener<Version> listener) {
                 if (doHandshake) {
-                    super.executeHandshake(node, channel, timeout, listener);
+                    super.executeHandshake(node, channel, profile, listener);
                 } else {
                     listener.onResponse(version.minimumCompatibilityVersion());
                 }

+ 4 - 3
test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java

@@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.MockPageCacheRecycler;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@@ -37,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
 import org.elasticsearch.transport.BindTransportException;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.TcpChannel;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.Transport;
@@ -62,9 +62,10 @@ public class SimpleMockNioTransportTests extends AbstractSimpleTransportTestCase
             new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) {
 
             @Override
-            public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
+            public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
+                                         ActionListener<Version> listener) {
                 if (doHandshake) {
-                    super.executeHandshake(node, channel, timeout, listener);
+                    super.executeHandshake(node, channel, profile, listener);
                 } else {
                     listener.onResponse(version.minimumCompatibilityVersion());
                 }

+ 3 - 9
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java

@@ -5,9 +5,7 @@
  */
 package org.elasticsearch.xpack.security.transport;
 
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.elasticsearch.Version;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
@@ -15,7 +13,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.MockSecureSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.env.TestEnvironment;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.test.transport.MockTransportService;
@@ -24,7 +21,6 @@ import org.elasticsearch.transport.BindTransportException;
 import org.elasticsearch.transport.ConnectTransportException;
 import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.TcpTransport;
-import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.common.socket.SocketAccess;
 import org.elasticsearch.xpack.core.ssl.SSLConfiguration;
@@ -39,6 +35,7 @@ import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.nio.file.Path;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
@@ -107,7 +104,7 @@ public abstract class AbstractSimpleSecurityTransportTestCase extends AbstractSi
     }
 
     @Override
-    public void testTcpHandshake() throws InterruptedException {
+    public void testTcpHandshake() {
         assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
         TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();
 
@@ -116,10 +113,7 @@ public abstract class AbstractSimpleSecurityTransportTestCase extends AbstractSi
              TcpTransport.NodeChannels connection = originalTransport.openConnection(
                  new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
                  connectionProfile)) {
-            PlainActionFuture<Version> listener = PlainActionFuture.newFuture();
-            originalTransport.executeHandshake(connection.getNode(), connection.channel(TransportRequestOptions.Type.PING),
-                TimeValue.timeValueSeconds(10), listener);
-            assertEquals(listener.actionGet(), Version.CURRENT);
+            assertEquals(connection.getVersion(), Version.CURRENT);
         }
     }
 

+ 1 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java

@@ -165,7 +165,7 @@ public class ServerTransportFilterIntegrationTests extends SecurityIntegTestCase
             node.start();
             TransportService instance = node.injector().getInstance(TransportService.class);
             try (Transport.Connection connection = instance.openConnection(new DiscoveryNode("theNode", transportAddress, Version.CURRENT),
-                    ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null))) {
+                    ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG))) {
                 // handshake should be ok
                 final DiscoveryNode handshake = instance.handshake(connection, 10000);
                 assertEquals(transport.boundAddress().publishAddress(), handshake.getAddress());

+ 3 - 3
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java

@@ -20,7 +20,6 @@ import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.test.transport.MockTransportService;
@@ -77,9 +76,10 @@ public class SimpleSecurityNetty4ServerTransportTests extends AbstractSimpleSecu
             new NoneCircuitBreakerService(), null, createSSLService(settings1)) {
 
             @Override
-            public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
+            public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
+                                         ActionListener<Version> listener) {
                 if (doHandshake) {
-                    super.executeHandshake(node, channel, timeout, listener);
+                    super.executeHandshake(node, channel, profile, listener);
                 } else {
                     listener.onResponse(version.minimumCompatibilityVersion());
                 }

+ 4 - 3
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java

@@ -12,12 +12,12 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.MockPageCacheRecycler;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.TcpChannel;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.Transport;
@@ -39,9 +39,10 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleSecurityTrans
             createSSLService(settings1)) {
 
             @Override
-            public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
+            public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
+                                         ActionListener<Version> listener) {
                 if (doHandshake) {
-                    super.executeHandshake(node, channel, timeout, listener);
+                    super.executeHandshake(node, channel, profile, listener);
                 } else {
                     listener.onResponse(version.minimumCompatibilityVersion());
                 }