|
@@ -20,6 +20,7 @@ import org.elasticsearch.common.util.PageCacheRecycler;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
+import org.elasticsearch.test.TransportVersionUtils;
|
|
|
import org.elasticsearch.test.VersionUtils;
|
|
|
import org.elasticsearch.test.transport.MockTransportService;
|
|
|
import org.elasticsearch.threadpool.TestThreadPool;
|
|
@@ -57,12 +58,13 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|
|
private TransportService startServices(
|
|
|
String nodeNameAndId,
|
|
|
Settings settings,
|
|
|
- Version version,
|
|
|
+ TransportVersion transportVersion,
|
|
|
+ Version nodeVersion,
|
|
|
TransportInterceptor transportInterceptor
|
|
|
) {
|
|
|
TcpTransport transport = new Netty4Transport(
|
|
|
settings,
|
|
|
- TransportVersion.CURRENT,
|
|
|
+ transportVersion,
|
|
|
threadPool,
|
|
|
new NetworkService(Collections.emptyList()),
|
|
|
PageCacheRecycler.NON_RECYCLING_INSTANCE,
|
|
@@ -81,7 +83,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|
|
boundAddress.publishAddress(),
|
|
|
emptyMap(),
|
|
|
emptySet(),
|
|
|
- version
|
|
|
+ nodeVersion
|
|
|
),
|
|
|
null,
|
|
|
Collections.emptySet()
|
|
@@ -110,10 +112,17 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|
|
public void testConnectToNodeLight() {
|
|
|
Settings settings = Settings.builder().put("cluster.name", "test").build();
|
|
|
|
|
|
- TransportService transportServiceA = startServices("TS_A", settings, Version.CURRENT, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
|
|
|
+ TransportService transportServiceA = startServices(
|
|
|
+ "TS_A",
|
|
|
+ settings,
|
|
|
+ TransportVersion.CURRENT,
|
|
|
+ Version.CURRENT,
|
|
|
+ TransportService.NOOP_TRANSPORT_INTERCEPTOR
|
|
|
+ );
|
|
|
TransportService transportServiceB = startServices(
|
|
|
"TS_B",
|
|
|
settings,
|
|
|
+ TransportVersionUtils.randomCompatibleVersion(random(), TransportVersion.CURRENT),
|
|
|
VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT),
|
|
|
TransportService.NOOP_TRANSPORT_INTERCEPTOR
|
|
|
);
|
|
@@ -145,12 +154,14 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|
|
TransportService transportServiceA = startServices(
|
|
|
"TS_A",
|
|
|
Settings.builder().put("cluster.name", "a").build(),
|
|
|
+ TransportVersion.CURRENT,
|
|
|
Version.CURRENT,
|
|
|
TransportService.NOOP_TRANSPORT_INTERCEPTOR
|
|
|
);
|
|
|
TransportService transportServiceB = startServices(
|
|
|
"TS_B",
|
|
|
Settings.builder().put("cluster.name", "b").build(),
|
|
|
+ TransportVersion.CURRENT,
|
|
|
Version.CURRENT,
|
|
|
TransportService.NOOP_TRANSPORT_INTERCEPTOR
|
|
|
);
|
|
@@ -179,12 +190,19 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|
|
assertFalse(transportServiceA.nodeConnected(discoveryNode));
|
|
|
}
|
|
|
|
|
|
- public void testIncompatibleVersions() {
|
|
|
+ public void testIncompatibleNodeVersions() {
|
|
|
Settings settings = Settings.builder().put("cluster.name", "test").build();
|
|
|
- TransportService transportServiceA = startServices("TS_A", settings, Version.CURRENT, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
|
|
|
+ TransportService transportServiceA = startServices(
|
|
|
+ "TS_A",
|
|
|
+ settings,
|
|
|
+ TransportVersion.CURRENT,
|
|
|
+ Version.CURRENT,
|
|
|
+ TransportService.NOOP_TRANSPORT_INTERCEPTOR
|
|
|
+ );
|
|
|
TransportService transportServiceB = startServices(
|
|
|
"TS_B",
|
|
|
settings,
|
|
|
+ TransportVersion.MINIMUM_COMPATIBLE,
|
|
|
VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()),
|
|
|
TransportService.NOOP_TRANSPORT_INTERCEPTOR
|
|
|
);
|
|
@@ -221,10 +239,60 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|
|
assertFalse(transportServiceA.nodeConnected(discoveryNode));
|
|
|
}
|
|
|
|
|
|
+ public void testIncompatibleTransportVersions() {
|
|
|
+ Settings settings = Settings.builder().put("cluster.name", "test").build();
|
|
|
+ TransportService transportServiceA = startServices(
|
|
|
+ "TS_A",
|
|
|
+ settings,
|
|
|
+ TransportVersion.CURRENT,
|
|
|
+ Version.CURRENT,
|
|
|
+ TransportService.NOOP_TRANSPORT_INTERCEPTOR
|
|
|
+ );
|
|
|
+ TransportService transportServiceB = startServices(
|
|
|
+ "TS_B",
|
|
|
+ settings,
|
|
|
+ TransportVersionUtils.getPreviousVersion(TransportVersion.MINIMUM_COMPATIBLE),
|
|
|
+ Version.CURRENT.minimumCompatibilityVersion(),
|
|
|
+ TransportService.NOOP_TRANSPORT_INTERCEPTOR
|
|
|
+ );
|
|
|
+ DiscoveryNode discoveryNode = new DiscoveryNode(
|
|
|
+ "",
|
|
|
+ transportServiceB.getLocalNode().getAddress(),
|
|
|
+ emptyMap(),
|
|
|
+ emptySet(),
|
|
|
+ Version.CURRENT.minimumCompatibilityVersion()
|
|
|
+ );
|
|
|
+ expectThrows(ConnectTransportException.class, () -> {
|
|
|
+ try (
|
|
|
+ Transport.Connection connection = AbstractSimpleTransportTestCase.openConnection(
|
|
|
+ transportServiceA,
|
|
|
+ discoveryNode,
|
|
|
+ TestProfiles.LIGHT_PROFILE
|
|
|
+ )
|
|
|
+ ) {
|
|
|
+ PlainActionFuture.get(fut -> transportServiceA.handshake(connection, timeout, fut.map(x -> null)));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ // the error is exposed as a general connection exception, the actual message is in the logs
|
|
|
+ assertFalse(transportServiceA.nodeConnected(discoveryNode));
|
|
|
+ }
|
|
|
+
|
|
|
public void testNodeConnectWithDifferentNodeId() {
|
|
|
Settings settings = Settings.builder().put("cluster.name", "test").build();
|
|
|
- TransportService transportServiceA = startServices("TS_A", settings, Version.CURRENT, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
|
|
|
- TransportService transportServiceB = startServices("TS_B", settings, Version.CURRENT, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
|
|
|
+ TransportService transportServiceA = startServices(
|
|
|
+ "TS_A",
|
|
|
+ settings,
|
|
|
+ TransportVersion.CURRENT,
|
|
|
+ Version.CURRENT,
|
|
|
+ TransportService.NOOP_TRANSPORT_INTERCEPTOR
|
|
|
+ );
|
|
|
+ TransportService transportServiceB = startServices(
|
|
|
+ "TS_B",
|
|
|
+ settings,
|
|
|
+ TransportVersion.CURRENT,
|
|
|
+ Version.CURRENT,
|
|
|
+ TransportService.NOOP_TRANSPORT_INTERCEPTOR
|
|
|
+ );
|
|
|
DiscoveryNode discoveryNode = new DiscoveryNode(
|
|
|
randomAlphaOfLength(10),
|
|
|
transportServiceB.getLocalNode().getAddress(),
|
|
@@ -249,8 +317,20 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|
|
.put("cluster.name", "a")
|
|
|
.put(IGNORE_DESERIALIZATION_ERRORS_SETTING.getKey(), true) // suppress assertions to test production error-handling
|
|
|
.build();
|
|
|
- final TransportService transportServiceA = startServices("TS_A", settings, Version.CURRENT, transportInterceptorA);
|
|
|
- final TransportService transportServiceB = startServices("TS_B", settings, Version.CURRENT, transportInterceptorB);
|
|
|
+ final TransportService transportServiceA = startServices(
|
|
|
+ "TS_A",
|
|
|
+ settings,
|
|
|
+ TransportVersion.CURRENT,
|
|
|
+ Version.CURRENT,
|
|
|
+ transportInterceptorA
|
|
|
+ );
|
|
|
+ final TransportService transportServiceB = startServices(
|
|
|
+ "TS_B",
|
|
|
+ settings,
|
|
|
+ TransportVersion.CURRENT,
|
|
|
+ Version.CURRENT,
|
|
|
+ transportInterceptorB
|
|
|
+ );
|
|
|
final DiscoveryNode discoveryNode = new DiscoveryNode(
|
|
|
"",
|
|
|
transportServiceB.getLocalNode().getAddress(),
|
|
@@ -284,12 +364,14 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|
|
final TransportService transportServiceA = startServices(
|
|
|
"TS_A",
|
|
|
Settings.builder().put("cluster.name", "a").build(),
|
|
|
+ TransportVersion.CURRENT,
|
|
|
Version.CURRENT,
|
|
|
transportInterceptorA
|
|
|
);
|
|
|
final TransportService transportServiceB = startServices(
|
|
|
"TS_B",
|
|
|
Settings.builder().put("cluster.name", "a").build(),
|
|
|
+ TransportVersion.MINIMUM_COMPATIBLE,
|
|
|
Version.CURRENT.minimumCompatibilityVersion(),
|
|
|
transportInterceptorB
|
|
|
);
|