Browse Source

Remove connectToNodeLight and replace it with a connection profile (#21799)

The Transport#connectToNodeLight concepts is confusing and not very flexible.
neither really testable on a unittest level. This commit cleans up the code used
to connect to nodes and simplifies transport implementations to share more code.
This also allows to connect to nodes with custom profiles if needed, for instance
future improvements can be added to connect to/from nodes that are non-data nodes without
dedicated bulks and recovery connections.
Simon Willnauer 9 years ago
parent
commit
f5ff69fabe
17 changed files with 345 additions and 263 deletions
  1. 3 2
      core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java
  2. 1 1
      core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
  3. 142 0
      core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java
  4. 55 81
      core/src/main/java/org/elasticsearch/transport/TcpTransport.java
  5. 3 9
      core/src/main/java/org/elasticsearch/transport/Transport.java
  6. 9 11
      core/src/main/java/org/elasticsearch/transport/TransportService.java
  7. 2 2
      core/src/main/java/org/elasticsearch/transport/TransportStatus.java
  8. 2 7
      core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java
  9. 7 9
      core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java
  10. 79 0
      core/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java
  11. 4 9
      core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java
  12. 4 4
      core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java
  13. 14 49
      modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java
  14. 2 6
      test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java
  15. 12 55
      test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java
  16. 2 2
      test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java
  17. 4 16
      test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

+ 3 - 2
core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java

@@ -42,6 +42,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.FutureTransportResponseHandler;
 import org.elasticsearch.transport.NodeDisconnectedException;
 import org.elasticsearch.transport.NodeNotConnectedException;
@@ -389,7 +390,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
                     try {
                         // its a listed node, light connect to it...
                         logger.trace("connecting to listed node (light) [{}]", listedNode);
-                        transportService.connectToNodeLight(listedNode);
+                        transportService.connectToNode(listedNode, ConnectionProfile.LIGHT_PROFILE);
                     } catch (Exception e) {
                         logger.info(
                             (Supplier<?>)
@@ -469,7 +470,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
                                     } else {
                                         // its a listed node, light connect to it...
                                         logger.trace("connecting to listed node (light) [{}]", listedNode);
-                                        transportService.connectToNodeLight(listedNode);
+                                        transportService.connectToNode(listedNode, ConnectionProfile.LIGHT_PROFILE);
                                     }
                                 } catch (Exception e) {
                                     logger.debug(

+ 1 - 1
core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

@@ -468,7 +468,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
                             // connect to the node, see if we manage to do it, if not, bail
                             if (!nodeFoundByAddress) {
                                 logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), finalNodeToSend);
-                                transportService.connectToNodeLightAndHandshake(finalNodeToSend, timeout.getMillis());
+                                transportService.connectToNodeAndHandshake(finalNodeToSend, timeout.getMillis());
                             } else {
                                 logger.trace("[{}] connecting to {}", sendPingsHandler.id(), finalNodeToSend);
                                 transportService.connectToNode(finalNodeToSend);

+ 142 - 0
core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java

@@ -0,0 +1,142 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.transport;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A connection profile describes how many connection are established to specific node for each of the available request types.
+ * ({@link org.elasticsearch.transport.TransportRequestOptions.Type}). This allows to tailor a connection towards a specific usage.
+ */
+public final class ConnectionProfile {
+
+    /**
+     * A pre-built light connection profile that shares a single connection across all
+     * types.
+     */
+    public static final ConnectionProfile LIGHT_PROFILE = new ConnectionProfile(
+        Collections.singletonList(new ConnectionTypeHandle(0, 1,
+            TransportRequestOptions.Type.BULK,
+            TransportRequestOptions.Type.PING,
+            TransportRequestOptions.Type.RECOVERY,
+            TransportRequestOptions.Type.REG,
+            TransportRequestOptions.Type.STATE)), 1);
+
+    private final List<ConnectionTypeHandle> handles;
+    private final int numConnections;
+
+    private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections) {
+        this.handles = handles;
+        this.numConnections = numConnections;
+    }
+
+    /**
+     * A builder to build a new {@link ConnectionProfile}
+     */
+    public static class Builder {
+        private final List<ConnectionTypeHandle> handles = new ArrayList<>();
+        private final Set<TransportRequestOptions.Type> addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class);
+        private int offset = 0;
+
+        /**
+         * Adds a number of connections for one or more types. Each type can only be added once.
+         * @param numConnections the number of connections to use in the pool for the given connection types
+         * @param types a set of types that should share the given number of connections
+         */
+        public void addConnections(int numConnections, TransportRequestOptions.Type... types) {
+            if (types == null || types.length == 0) {
+                throw new IllegalArgumentException("types must not be null");
+            }
+            for (TransportRequestOptions.Type type : types) {
+                if (addedTypes.contains(type)) {
+                    throw new IllegalArgumentException("type [" + type + "] is already registered");
+                }
+            }
+            addedTypes.addAll(Arrays.asList(types));
+            handles.add(new ConnectionTypeHandle(offset, numConnections, types));
+            offset += numConnections;
+        }
+
+        /**
+         * Creates a new {@link ConnectionProfile} based on the added connections.
+         * @throws IllegalStateException if any of the {@link org.elasticsearch.transport.TransportRequestOptions.Type} enum is missing
+         */
+        public ConnectionProfile build() {
+            EnumSet<TransportRequestOptions.Type> types = EnumSet.allOf(TransportRequestOptions.Type.class);
+            types.removeAll(addedTypes);
+            if (types.isEmpty() == false) {
+                throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types);
+            }
+            return new ConnectionProfile(Collections.unmodifiableList(handles), offset);
+        }
+    }
+
+    /**
+     * Returns the total number of connections for this profile
+     */
+    public int getNumConnections() {
+        return numConnections;
+    }
+
+    /**
+     * Returns the type handles for this connection profile
+     */
+    List<ConnectionTypeHandle> getHandles() {
+        return Collections.unmodifiableList(handles);
+    }
+
+    /**
+     * Connection type handle encapsulates the logic which connection
+     */
+    static final class ConnectionTypeHandle {
+        public final int length;
+        public final int offset;
+        private final TransportRequestOptions.Type[] types;
+        private final AtomicInteger counter = new AtomicInteger();
+
+        private ConnectionTypeHandle(int offset, int length, TransportRequestOptions.Type... types) {
+            this.length = length;
+            this.offset = offset;
+            this.types = types;
+        }
+
+        /**
+         * Returns one of the channels out configured for this handle. The channel is selected in a round-robin
+         * fashion.
+         */
+        <T> T getChannel(T[] channels) {
+            assert channels.length >= offset + length : "illegal size: " + channels.length + " expected >= " + (offset + length);
+            return channels[offset + Math.floorMod(counter.incrementAndGet(), length)];
+        }
+
+        /**
+         * Returns all types for this handle
+         */
+        TransportRequestOptions.Type[] getTypes() {
+            return types;
+        }
+    }
+
+}

+ 55 - 81
core/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -67,7 +67,6 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.monitor.jvm.JvmInfo;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.support.TransportStatus;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -81,6 +80,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -91,12 +91,13 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.unmodifiableMap;
 import static org.elasticsearch.common.settings.Setting.boolSetting;
@@ -178,6 +179,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
     protected final boolean compress;
     protected volatile BoundTransportAddress boundAddress;
     private final String transportName;
+    private final ConnectionProfile defaultConnectionProfile;
 
     public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
                         CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
@@ -200,6 +202,13 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
         this.connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
         this.connectTimeout = TCP_CONNECT_TIMEOUT.get(settings);
         this.blockingClient = TCP_BLOCKING_CLIENT.get(settings);
+        ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
+        builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
+        builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
+        builder.addConnections(connectionsPerNodeRecovery, TransportRequestOptions.Type.RECOVERY);
+        builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
+        builder.addConnections(connectionsPerNodeState, TransportRequestOptions.Type.STATE);
+        defaultConnectionProfile = builder.build();
     }
 
     @Override
@@ -255,7 +264,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
             for (Map.Entry<DiscoveryNode, NodeChannels> entry : connectedNodes.entrySet()) {
                 DiscoveryNode node = entry.getKey();
                 NodeChannels channels = entry.getValue();
-                for (Channel channel : channels.allChannels) {
+                for (Channel channel : channels.getChannels()) {
                     try {
                         sendMessage(channel, pingHeader, successfulPings::inc);
                     } catch (Exception e) {
@@ -304,40 +313,31 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
         }
     }
 
-    public class NodeChannels implements Closeable {
-
-        public List<Channel> allChannels = Collections.emptyList();
-        public Channel[] recovery;
-        public final AtomicInteger recoveryCounter = new AtomicInteger();
-        public Channel[] bulk;
-        public final AtomicInteger bulkCounter = new AtomicInteger();
-        public Channel[] reg;
-        public final AtomicInteger regCounter = new AtomicInteger();
-        public Channel[] state;
-        public final AtomicInteger stateCounter = new AtomicInteger();
-        public Channel[] ping;
-        public final AtomicInteger pingCounter = new AtomicInteger();
-
-        public NodeChannels(Channel[] recovery, Channel[] bulk, Channel[] reg, Channel[] state, Channel[] ping) {
-            this.recovery = recovery;
-            this.bulk = bulk;
-            this.reg = reg;
-            this.state = state;
-            this.ping = ping;
+    public final class NodeChannels implements Closeable {
+        private final Map<TransportRequestOptions.Type, ConnectionProfile.ConnectionTypeHandle> typeMapping
+            = new EnumMap<>(TransportRequestOptions.Type.class);
+        private final Channel[] channels;
+        private final AtomicBoolean establishedAllConnections = new AtomicBoolean(false);
+
+        public NodeChannels(Channel[] channels, ConnectionProfile connectionProfile) {
+            this.channels = channels;
+            assert channels.length == connectionProfile.getNumConnections() : "expected channels size to be == "
+                + connectionProfile.getNumConnections() + " but was: [" + channels.length + "]";
+            for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) {
+                for (TransportRequestOptions.Type type : handle.getTypes())
+                typeMapping.put(type, handle);
+            }
         }
 
-        public void start() {
-            List<Channel> newAllChannels = new ArrayList<>();
-            newAllChannels.addAll(Arrays.asList(recovery));
-            newAllChannels.addAll(Arrays.asList(bulk));
-            newAllChannels.addAll(Arrays.asList(reg));
-            newAllChannels.addAll(Arrays.asList(state));
-            newAllChannels.addAll(Arrays.asList(ping));
-            this.allChannels = Collections.unmodifiableList(newAllChannels);
+        public void connectionsEstablished() {
+            if (establishedAllConnections.compareAndSet(false, true) == false) {
+                throw new AssertionError("connected more than once");
+            }
+
         }
 
         public boolean hasChannel(Channel channel) {
-            for (Channel channel1 : allChannels) {
+            for (Channel channel1 : channels) {
                 if (channel.equals(channel1)) {
                     return true;
                 }
@@ -345,29 +345,26 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
             return false;
         }
 
-        public Channel channel(TransportRequestOptions.Type type) {
-            if (type == TransportRequestOptions.Type.REG) {
-                return reg[Math.floorMod(regCounter.incrementAndGet(), reg.length)];
-            } else if (type == TransportRequestOptions.Type.STATE) {
-                return state[Math.floorMod(stateCounter.incrementAndGet(), state.length)];
-            } else if (type == TransportRequestOptions.Type.PING) {
-                return ping[Math.floorMod(pingCounter.incrementAndGet(), ping.length)];
-            } else if (type == TransportRequestOptions.Type.BULK) {
-                return bulk[Math.floorMod(bulkCounter.incrementAndGet(), bulk.length)];
-            } else if (type == TransportRequestOptions.Type.RECOVERY) {
-                return recovery[Math.floorMod(recoveryCounter.incrementAndGet(), recovery.length)];
+        public List<Channel> getChannels() {
+            if (establishedAllConnections.get()) { // don't expose the channels until we are connected
+                return Arrays.asList(channels);
             } else {
-                throw new IllegalArgumentException("no type channel for [" + type + "]");
+                return Collections.emptyList();
             }
         }
 
-        public List<Channel[]> getChannelArrays() {
-            return Arrays.asList(recovery, bulk, reg, state, ping);
+        public Channel channel(TransportRequestOptions.Type type) {
+            assert establishedAllConnections.get();
+            ConnectionProfile.ConnectionTypeHandle connectionTypeHandle = typeMapping.get(type);
+            if (connectionTypeHandle == null) {
+                throw new IllegalArgumentException("no type channel for [" + type + "]");
+            }
+            return connectionTypeHandle.getChannel(channels);
         }
 
         @Override
         public synchronized void close() throws IOException {
-            closeChannels(allChannels);
+            closeChannels(Arrays.asList(channels).stream().filter(Objects::nonNull).collect(Collectors.toList()));
         }
     }
 
@@ -377,16 +374,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
     }
 
     @Override
-    public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
-        connectToNode(node, true);
-    }
-
-    @Override
-    public void connectToNode(DiscoveryNode node) {
-        connectToNode(node, false);
-    }
-
-    public void connectToNode(DiscoveryNode node, boolean light) {
+    public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) {
+        connectionProfile = connectionProfile == null ? defaultConnectionProfile : connectionProfile;
         if (!lifecycle.started()) {
             throw new IllegalStateException("can't add nodes to a stopped transport");
         }
@@ -405,20 +394,16 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
                     return;
                 }
                 try {
-                    if (light) {
-                        nodeChannels = connectToChannelsLight(node);
-                    } else {
-                        try {
-                            nodeChannels = connectToChannels(node);
-                        } catch (Exception e) {
-                            logger.trace(
-                                (Supplier<?>) () -> new ParameterizedMessage(
-                                    "failed to connect to [{}], cleaning dangling connections", node), e);
-                            throw e;
-                        }
+                    try {
+                        nodeChannels = connectToChannels(node, connectionProfile);
+                    } catch (Exception e) {
+                        logger.trace(
+                            (Supplier<?>) () -> new ParameterizedMessage(
+                                "failed to connect to [{}], cleaning dangling connections", node), e);
+                        throw e;
                     }
                     // we acquire a connection lock, so no way there is an existing connection
-                    nodeChannels.start();
+                    nodeChannels.connectionsEstablished();
                     connectedNodes.put(node, nodeChannels);
                     if (logger.isDebugEnabled()) {
                         logger.debug("connected to node [{}]", node);
@@ -884,21 +869,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
      */
     protected abstract void closeChannels(List<Channel> channel) throws IOException;
 
-    /**
-     * Connects to the given node in a light way. This means we are not creating multiple connections like we do
-     * for production connections. This connection is for pings or handshakes
-     */
-    protected abstract NodeChannels connectToChannelsLight(DiscoveryNode node) throws IOException;
-
 
     protected abstract void sendMessage(Channel channel, BytesReference reference, Runnable sendListener) throws IOException;
 
-    /**
-     * Connects to the node in a <tt>heavy</tt> way.
-     *
-     * @see #connectToChannelsLight(DiscoveryNode)
-     */
-    protected abstract NodeChannels connectToChannels(DiscoveryNode node) throws IOException;
+    protected abstract NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException;
 
     /**
      * Called to tear down internal resources

+ 3 - 9
core/src/main/java/org/elasticsearch/transport/Transport.java

@@ -62,15 +62,10 @@ public interface Transport extends LifecycleComponent {
     boolean nodeConnected(DiscoveryNode node);
 
     /**
-     * Connects to the given node, if already connected, does nothing.
+     * Connects to a node with the given connection profile. Use {@link ConnectionProfile#LIGHT_PROFILE} when just connecting for ping
+     * and then disconnecting. If the node is already connected this method has no effect
      */
-    void connectToNode(DiscoveryNode node) throws ConnectTransportException;
-
-    /**
-     * Connects to a node in a light manner. Used when just connecting for ping and then
-     * disconnecting.
-     */
-    void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException;
+    void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException;
 
     /**
      * Disconnected from the given node, if not connected, will do nothing.
@@ -94,5 +89,4 @@ public interface Transport extends LifecycleComponent {
     default CircuitBreaker getInFlightRequestBreaker() {
         return new NoopCircuitBreaker("in-flight-noop");
     }
-
 }

+ 9 - 11
core/src/main/java/org/elasticsearch/transport/TransportService.java

@@ -281,22 +281,20 @@ public class TransportService extends AbstractLifecycleComponent {
     }
 
     public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
-        if (node.equals(localNode)) {
-            return;
-        }
-        transport.connectToNode(node);
+        connectToNode(node, null);
     }
 
     /**
-     * Lightly connect to the specified node
+     * Connect to the specified node with the given connection profile
      *
      * @param node the node to connect to
+     * @param connectionProfile the connection profile to use when connecting to this node
      */
-    public void connectToNodeLight(final DiscoveryNode node) {
+    public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile) {
         if (node.equals(localNode)) {
             return;
         }
-        transport.connectToNodeLight(node);
+        transport.connectToNode(node, connectionProfile);
     }
 
     /**
@@ -309,10 +307,10 @@ public class TransportService extends AbstractLifecycleComponent {
      * @throws ConnectTransportException if the connection or the
      *                                   handshake failed
      */
-    public DiscoveryNode connectToNodeLightAndHandshake(
+    public DiscoveryNode connectToNodeAndHandshake(
             final DiscoveryNode node,
             final long handshakeTimeout) throws ConnectTransportException {
-        return connectToNodeLightAndHandshake(node, handshakeTimeout, true);
+        return connectToNodeAndHandshake(node, handshakeTimeout, true);
     }
 
     /**
@@ -329,14 +327,14 @@ public class TransportService extends AbstractLifecycleComponent {
      * @throws ConnectTransportException if the connection failed
      * @throws IllegalStateException if the handshake failed
      */
-    public DiscoveryNode connectToNodeLightAndHandshake(
+    public DiscoveryNode connectToNodeAndHandshake(
             final DiscoveryNode node,
             final long handshakeTimeout,
             final boolean checkClusterName) {
         if (node.equals(localNode)) {
             return localNode;
         }
-        transport.connectToNodeLight(node);
+        transport.connectToNode(node, ConnectionProfile.LIGHT_PROFILE);
         try {
             return handshake(node, handshakeTimeout, checkClusterName);
         } catch (ConnectTransportException | IllegalStateException e) {

+ 2 - 2
core/src/main/java/org/elasticsearch/transport/support/TransportStatus.java → core/src/main/java/org/elasticsearch/transport/TransportStatus.java

@@ -17,9 +17,9 @@
  * under the License.
  */
 
-package org.elasticsearch.transport.support;
+package org.elasticsearch.transport;
 
-public class TransportStatus {
+final class TransportStatus {
 
     private static final byte STATUS_REQRES = 1 << 0;
     private static final byte STATUS_ERROR = 1 << 1;

+ 2 - 7
core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java

@@ -26,13 +26,13 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.component.LifecycleListener;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.BoundTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequest;
@@ -159,12 +159,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
     }
 
     @Override
-    public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
-
-    }
-
-    @Override
-    public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
+    public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
 
     }
 

+ 7 - 9
core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

@@ -31,6 +31,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequest;
@@ -198,16 +199,13 @@ public class NodeConnectionsServiceTests extends ESTestCase {
         }
 
         @Override
-        public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
-            if (connectedNodes.contains(node) == false && randomConnectionExceptions && randomBoolean()) {
-                throw new ConnectTransportException(node, "simulated");
+        public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
+            if (connectionProfile == null) {
+                if (connectedNodes.contains(node) == false && randomConnectionExceptions && randomBoolean()) {
+                    throw new ConnectTransportException(node, "simulated");
+                }
+                connectedNodes.add(node);
             }
-            connectedNodes.add(node);
-        }
-
-        @Override
-        public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
-
         }
 
         @Override

+ 79 - 0
core/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java

@@ -0,0 +1,79 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.transport;
+
+import org.elasticsearch.test.ESTestCase;
+import org.hamcrest.Matchers;
+
+public class ConnectionProfileTests extends ESTestCase {
+
+    public void testBuildConnectionProfile() {
+        ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
+        builder.addConnections(1, TransportRequestOptions.Type.BULK);
+        builder.addConnections(2, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY);
+        builder.addConnections(3, TransportRequestOptions.Type.PING);
+        IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, builder::build);
+        assertEquals("not all types are added for this connection profile - missing types: [REG]", illegalStateException.getMessage());
+
+        IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class,
+            () -> builder.addConnections(4, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING));
+        assertEquals("type [PING] is already registered", illegalArgumentException.getMessage());
+        builder.addConnections(4, TransportRequestOptions.Type.REG);
+        ConnectionProfile build = builder.build();
+        assertEquals(10, build.getNumConnections());
+        Integer[] array = new Integer[10];
+        for (int i = 0; i < array.length; i++) {
+            array[i] = i;
+        }
+        final int numIters = randomIntBetween(5, 10);
+        assertEquals(4, build.getHandles().size());
+        assertEquals(0, build.getHandles().get(0).offset);
+        assertEquals(1, build.getHandles().get(0).length);
+        assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.BULK}, build.getHandles().get(0).getTypes());
+        Integer channel = build.getHandles().get(0).getChannel(array);
+        for (int i = 0; i < numIters; i++) {
+            assertEquals(0, channel.intValue());
+        }
+
+        assertEquals(1, build.getHandles().get(1).offset);
+        assertEquals(2, build.getHandles().get(1).length);
+        assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY},
+            build.getHandles().get(1).getTypes());
+        channel = build.getHandles().get(1).getChannel(array);
+        for (int i = 0; i < numIters; i++) {
+            assertThat(channel, Matchers.anyOf(Matchers.is(1), Matchers.is(2)));
+        }
+
+        assertEquals(3, build.getHandles().get(2).offset);
+        assertEquals(3, build.getHandles().get(2).length);
+        assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.PING}, build.getHandles().get(2).getTypes());
+        channel = build.getHandles().get(2).getChannel(array);
+        for (int i = 0; i < numIters; i++) {
+            assertThat(channel, Matchers.anyOf(Matchers.is(3), Matchers.is(4), Matchers.is(5)));
+        }
+
+        assertEquals(6, build.getHandles().get(3).offset);
+        assertEquals(4, build.getHandles().get(3).length);
+        assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.REG}, build.getHandles().get(3).getTypes());
+        channel = build.getHandles().get(3).getChannel(array);
+        for (int i = 0; i < numIters; i++) {
+            assertThat(channel, Matchers.anyOf(Matchers.is(6), Matchers.is(7), Matchers.is(8), Matchers.is(9)));
+        }
+    }
+}

+ 4 - 9
core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java

@@ -31,7 +31,6 @@ import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.support.TransportStatus;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -168,11 +167,6 @@ public class TCPTransportTests extends ESTestCase {
 
                 }
 
-                @Override
-                protected NodeChannels connectToChannelsLight(DiscoveryNode node) throws IOException {
-                    return new NodeChannels(new Object[0], new Object[0], new Object[0], new Object[0], new Object[0]);
-                }
-
                 @Override
                 protected void sendMessage(Object o, BytesReference reference, Runnable sendListener) throws IOException {
                     StreamInput streamIn = reference.streamInput();
@@ -198,8 +192,8 @@ public class TCPTransportTests extends ESTestCase {
                 }
 
                 @Override
-                protected NodeChannels connectToChannels(DiscoveryNode node) throws IOException {
-                    return new NodeChannels(new Object[0], new Object[0], new Object[0], new Object[0], new Object[0]);
+                protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException {
+                    return new NodeChannels(new Object[profile.getNumConnections()], profile);
                 }
 
                 @Override
@@ -214,7 +208,8 @@ public class TCPTransportTests extends ESTestCase {
 
                 @Override
                 protected Object nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException {
-                    return new NodeChannels(new Object[0], new Object[0], new Object[0], new Object[0], new Object[0]);
+                    return new NodeChannels(new Object[ConnectionProfile.LIGHT_PROFILE.getNumConnections()],
+                        ConnectionProfile.LIGHT_PROFILE);
                 }
             };
             DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);

+ 4 - 4
core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java

@@ -113,7 +113,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
             emptySet(),
             Version.CURRENT.minimumCompatibilityVersion());
         DiscoveryNode connectedNode =
-                handleA.transportService.connectToNodeLightAndHandshake(discoveryNode, timeout);
+                handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout);
         assertNotNull(connectedNode);
 
         // the name and version should be updated
@@ -132,7 +132,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
             emptyMap(),
             emptySet(),
             Version.CURRENT.minimumCompatibilityVersion());
-        IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeLightAndHandshake(
+        IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
                 discoveryNode, timeout));
         assertThat(ex.getMessage(), containsString("handshake failed, mismatched cluster name [Cluster [b]]"));
         assertFalse(handleA.transportService.nodeConnected(discoveryNode));
@@ -149,7 +149,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
             emptyMap(),
             emptySet(),
             Version.CURRENT.minimumCompatibilityVersion());
-        IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeLightAndHandshake(
+        IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
             discoveryNode, timeout));
         assertThat(ex.getMessage(), containsString("handshake failed, incompatible version"));
         assertFalse(handleA.transportService.nodeConnected(discoveryNode));
@@ -171,7 +171,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
             emptyMap(),
             emptySet(),
             Version.CURRENT.minimumCompatibilityVersion());
-        DiscoveryNode connectedNode = handleA.transportService.connectToNodeLightAndHandshake(discoveryNode, timeout, false);
+        DiscoveryNode connectedNode = handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout, false);
         assertNotNull(connectedNode);
         assertEquals(connectedNode.getName(), "TS_B");
         assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());

+ 14 - 49
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

@@ -62,6 +62,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.monitor.jvm.JvmInfo;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.TransportServiceAdapter;
 import org.elasticsearch.transport.TransportSettings;
@@ -331,37 +332,13 @@ public class Netty4Transport extends TcpTransport<Channel> {
         return channels == null ? 0 : channels.numberOfOpenChannels();
     }
 
-    protected NodeChannels connectToChannelsLight(DiscoveryNode node) {
-        InetSocketAddress address = node.getAddress().address();
-        ChannelFuture connect = bootstrap.connect(address);
-        connect.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
-        if (!connect.isSuccess()) {
-            throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connect.cause());
-        }
-        Channel[] channels = new Channel[1];
-        channels[0] = connect.channel();
-        channels[0].closeFuture().addListener(new ChannelCloseListener(node));
-        NodeChannels nodeChannels = new NodeChannels(channels, channels, channels, channels, channels);
-        onAfterChannelsConnected(nodeChannels);
-        return nodeChannels;
-    }
-
-    protected NodeChannels connectToChannels(DiscoveryNode node) {
-        final NodeChannels nodeChannels =
-            new NodeChannels(
-                new Channel[connectionsPerNodeRecovery],
-                new Channel[connectionsPerNodeBulk],
-                new Channel[connectionsPerNodeReg],
-                new Channel[connectionsPerNodeState],
-                new Channel[connectionsPerNodePing]);
+    @Override
+    protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) {
+        final Channel[] channels = new Channel[profile.getNumConnections()];
+        final NodeChannels nodeChannels = new NodeChannels(channels, profile);
         boolean success = false;
         try {
-            int numConnections =
-                connectionsPerNodeRecovery +
-                    connectionsPerNodeBulk +
-                    connectionsPerNodeReg +
-                    connectionsPerNodeState +
-                    connectionsPerNodeRecovery;
+            int numConnections = channels.length;
             final ArrayList<ChannelFuture> connections = new ArrayList<>(numConnections);
             final InetSocketAddress address = node.getAddress().address();
             for (int i = 0; i < numConnections; i++) {
@@ -369,27 +346,15 @@ public class Netty4Transport extends TcpTransport<Channel> {
             }
             final Iterator<ChannelFuture> iterator = connections.iterator();
             try {
-                for (Channel[] channels : nodeChannels.getChannelArrays()) {
-                    for (int i = 0; i < channels.length; i++) {
-                        assert iterator.hasNext();
-                        ChannelFuture future = iterator.next();
-                        future.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
-                        if (!future.isSuccess()) {
-                            throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", future.cause());
-                        }
-                        channels[i] = future.channel();
-                        channels[i].closeFuture().addListener(new ChannelCloseListener(node));
-                    }
-                }
-                if (nodeChannels.recovery.length == 0) {
-                    if (nodeChannels.bulk.length > 0) {
-                        nodeChannels.recovery = nodeChannels.bulk;
-                    } else {
-                        nodeChannels.recovery = nodeChannels.reg;
+                for (int i = 0; i < channels.length; i++) {
+                    assert iterator.hasNext();
+                    ChannelFuture future = iterator.next();
+                    future.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
+                    if (!future.isSuccess()) {
+                        throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", future.cause());
                     }
-                }
-                if (nodeChannels.bulk.length == 0) {
-                    nodeChannels.bulk = nodeChannels.reg;
+                    channels[i] = future.channel();
+                    channels[i].closeFuture().addListener(new ChannelCloseListener(node));
                 }
             } catch (final RuntimeException e) {
                 for (final ChannelFuture future : Collections.unmodifiableList(connections)) {

+ 2 - 6
test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java

@@ -30,6 +30,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.transport.SendRequestTransportException;
 import org.elasticsearch.transport.Transport;
@@ -219,12 +220,7 @@ public class CapturingTransport implements Transport {
     }
 
     @Override
-    public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
-
-    }
-
-    @Override
-    public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
+    public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
 
     }
 

+ 12 - 55
test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

@@ -42,6 +42,7 @@ import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.test.tasks.MockTaskManager;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.MockTcpTransport;
 import org.elasticsearch.transport.RequestHandlerRegistry;
 import org.elasticsearch.transport.Transport;
@@ -175,13 +176,9 @@ public final class MockTransportService extends TransportService {
      */
     public void addFailToSendNoConnectRule(TransportAddress transportAddress) {
         addDelegate(transportAddress, new DelegateTransport(original) {
-            @Override
-            public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
-                throw new ConnectTransportException(node, "DISCONNECT: simulated");
-            }
 
             @Override
-            public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
+            public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
                 throw new ConnectTransportException(node, "DISCONNECT: simulated");
             }
 
@@ -222,14 +219,10 @@ public final class MockTransportService extends TransportService {
     public void addFailToSendNoConnectRule(TransportAddress transportAddress, final Set<String> blockedActions) {
 
         addDelegate(transportAddress, new DelegateTransport(original) {
-            @Override
-            public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
-                original.connectToNode(node);
-            }
 
             @Override
-            public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
-                original.connectToNodeLight(node);
+            public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
+                original.connectToNode(node, connectionProfile);
             }
 
             @Override
@@ -260,13 +253,9 @@ public final class MockTransportService extends TransportService {
      */
     public void addUnresponsiveRule(TransportAddress transportAddress) {
         addDelegate(transportAddress, new DelegateTransport(original) {
-            @Override
-            public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
-                throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
-            }
 
             @Override
-            public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
+            public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
                 throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
             }
 
@@ -308,10 +297,10 @@ public final class MockTransportService extends TransportService {
             }
 
             @Override
-            public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
+            public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
                 TimeValue delay = getDelay();
                 if (delay.millis() <= 0) {
-                    original.connectToNode(node);
+                    original.connectToNode(node, connectionProfile);
                     return;
                 }
 
@@ -320,30 +309,7 @@ public final class MockTransportService extends TransportService {
                 try {
                     if (delay.millis() < connectingTimeout.millis()) {
                         Thread.sleep(delay.millis());
-                        original.connectToNode(node);
-                    } else {
-                        Thread.sleep(connectingTimeout.millis());
-                        throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
-                    }
-                } catch (InterruptedException e) {
-                    throw new ConnectTransportException(node, "UNRESPONSIVE: interrupted while sleeping", e);
-                }
-            }
-
-            @Override
-            public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
-                TimeValue delay = getDelay();
-                if (delay.millis() <= 0) {
-                    original.connectToNodeLight(node);
-                    return;
-                }
-
-                // TODO: Replace with proper setting
-                TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
-                try {
-                    if (delay.millis() < connectingTimeout.millis()) {
-                        Thread.sleep(delay.millis());
-                        original.connectToNodeLight(node);
+                        original.connectToNode(node, connectionProfile);
                     } else {
                         Thread.sleep(connectingTimeout.millis());
                         throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
@@ -461,14 +427,10 @@ public final class MockTransportService extends TransportService {
             return getTransport(node).nodeConnected(node);
         }
 
-        @Override
-        public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
-            getTransport(node).connectToNode(node);
-        }
 
         @Override
-        public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
-            getTransport(node).connectToNodeLight(node);
+        public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
+            getTransport(node).connectToNode(node, connectionProfile);
         }
 
         @Override
@@ -517,13 +479,8 @@ public final class MockTransportService extends TransportService {
         }
 
         @Override
-        public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
-            transport.connectToNode(node);
-        }
-
-        @Override
-        public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
-            transport.connectToNodeLight(node);
+        public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
+            transport.connectToNode(node, connectionProfile);
         }
 
         @Override

+ 2 - 2
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -1310,7 +1310,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         }
 
         try {
-            serviceB.connectToNodeLightAndHandshake(nodeA, 100);
+            serviceB.connectToNodeAndHandshake(nodeA, 100);
             fail("exception should be thrown");
         } catch (ConnectTransportException e) {
             // all is well
@@ -1368,7 +1368,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         }
 
         try {
-            serviceB.connectToNodeLightAndHandshake(nodeA, 100);
+            serviceB.connectToNodeAndHandshake(nodeA, 100);
             fail("exception should be thrown");
         } catch (ConnectTransportException e) {
             // all is well

+ 4 - 16
test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

@@ -157,17 +157,9 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
     }
 
     @Override
-    protected NodeChannels connectToChannelsLight(DiscoveryNode node) throws IOException {
-        return connectToChannels(node);
-    }
-
-    @Override
-    protected NodeChannels connectToChannels(DiscoveryNode node) throws IOException {
-        final NodeChannels nodeChannels = new NodeChannels(new MockChannel[1],
-            new MockChannel[1],
-            new MockChannel[1],
-            new MockChannel[1],
-            new MockChannel[1]);
+    protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException {
+        final MockChannel[] mockChannels = new MockChannel[1];
+        final NodeChannels nodeChannels = new NodeChannels(mockChannels, ConnectionProfile.LIGHT_PROFILE); // we always use light here
         boolean success = false;
         final Socket socket = new Socket();
         try {
@@ -189,11 +181,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
             socket.connect(address, (int) TCP_CONNECT_TIMEOUT.get(settings).millis());
             MockChannel channel = new MockChannel(socket, address, "none", onClose);
             channel.loopRead(executor);
-            for (MockChannel[] channels : nodeChannels.getChannelArrays()) {
-                for (int i = 0; i < channels.length; i++) {
-                    channels[i] = channel;
-                }
-            }
+            mockChannels[0] = channel;
             success = true;
         } finally {
             if (success == false) {