1
0
Эх сурвалжийг харах

Execute low level handshake in #openConnection (#22440)

Today we execute the low level handshake on the TCP layer in #connectToNode.
If #openConnection is used directly, which is truly expert, no handshake is executed
which allows connecting to nodes that are not necessarily compatible. This change
moves the handshake to #openConnection to prevent bypassing this logic.
Simon Willnauer 8 жил өмнө
parent
commit
a5daa5d3a2

+ 10 - 10
core/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -458,13 +458,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
                                 "failed to connect to [{}], cleaning dangling connections", node), e);
                         throw e;
                     }
-                    Channel channel = nodeChannels.channel(TransportRequestOptions.Type.PING);
-                    final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
-                        defaultConnectionProfile.getConnectTimeout() :
-                        connectionProfile.getConnectTimeout();
-                    final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
-                        connectTimeout : connectionProfile.getHandshakeTimeout();
-                    Version version = executeHandshake(node, channel, handshakeTimeout);
                     // we acquire a connection lock, so no way there is an existing connection
                     connectedNodes.put(node, nodeChannels);
                     if (logger.isDebugEnabled()) {
@@ -483,11 +476,18 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
     }
 
     @Override
-    public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
+    public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
         try {
-            NodeChannels nodeChannels = connectToChannels(node, profile);
+            NodeChannels nodeChannels = connectToChannels(node, connectionProfile);
+            final Channel channel = nodeChannels.getChannels().get(0); // one channel is guaranteed by the connection profile
+            final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
+                defaultConnectionProfile.getConnectTimeout() :
+                connectionProfile.getConnectTimeout();
+            final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
+                connectTimeout : connectionProfile.getHandshakeTimeout();
+            final Version version = executeHandshake(node, channel, handshakeTimeout);
             transportServiceAdapter.onConnectionOpened(node);
-            return nodeChannels;
+            return new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version
         } catch (ConnectTransportException e) {
             throw e;
         } catch (Exception e) {

+ 5 - 0
test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

@@ -684,6 +684,11 @@ public final class MockTransportService extends TransportService {
             return connection.getNode();
         }
 
+        @Override
+        public Version getVersion() {
+            return connection.getVersion();
+        }
+
         @Override
         public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
             throws IOException, TransportException {

+ 48 - 1
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -43,6 +43,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.mocksocket.MockServerSocket;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.VersionUtils;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -1818,6 +1819,52 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         }
     }
 
+    public void testHandshakeWithIncompatVersion() {
+        assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
+        NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
+        try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
+            new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()),
+            Version.fromString("2.0.0"))) {
+            transport.transportServiceAdapter(serviceA.new Adapter());
+            transport.start();
+            DiscoveryNode node =
+                new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
+            ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
+            builder.addConnections(1,
+                TransportRequestOptions.Type.BULK,
+                TransportRequestOptions.Type.PING,
+                TransportRequestOptions.Type.RECOVERY,
+                TransportRequestOptions.Type.REG,
+                TransportRequestOptions.Type.STATE);
+            expectThrows(ConnectTransportException.class, () -> serviceA.openConnection(node, builder.build()));
+        }
+    }
+
+    public void testHandshakeUpdatesVersion() throws IOException {
+        assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
+        NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
+        Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
+        try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
+            new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()),version)) {
+            transport.transportServiceAdapter(serviceA.new Adapter());
+            transport.start();
+            DiscoveryNode node =
+                new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(),
+                    Version.fromString("2.0.0"));
+            ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
+            builder.addConnections(1,
+                TransportRequestOptions.Type.BULK,
+                TransportRequestOptions.Type.PING,
+                TransportRequestOptions.Type.RECOVERY,
+                TransportRequestOptions.Type.REG,
+                TransportRequestOptions.Type.STATE);
+            try (Transport.Connection connection = serviceA.openConnection(node, builder.build())) {
+                assertEquals(connection.getVersion(), version);
+            }
+        }
+    }
+
+
     public void testTcpHandshake() throws IOException, InterruptedException {
         assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
         TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();
@@ -1830,7 +1877,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                                            int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)
                 throws IOException {
                 return super.handleRequest(mockChannel, profileName, stream, requestId, messageLengthBytes, version, remoteAddress,
-                    (byte)(status & ~(1<<3))); // we flip the isHanshake bit back and ackt like the handler is not found
+                    (byte)(status & ~(1<<3))); // we flip the isHandshake bit back and act like the handler is not found
             }
         }) {
             transport.transportServiceAdapter(serviceA.new Adapter());