|
|
@@ -46,7 +46,7 @@ import org.elasticsearch.node.Node;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
import org.elasticsearch.test.MockLogAppender;
|
|
|
-import org.elasticsearch.test.VersionUtils;
|
|
|
+import org.elasticsearch.test.TransportVersionUtils;
|
|
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
|
|
import org.elasticsearch.test.transport.MockTransportService;
|
|
|
import org.elasticsearch.test.transport.StubbableTransport;
|
|
|
@@ -110,14 +110,15 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
|
|
|
protected ThreadPool threadPool;
|
|
|
// we use always a non-alpha or beta version here otherwise minimumCompatibilityVersion will be different for the two used versions
|
|
|
- private static final Version CURRENT_VERSION = Version.fromString(String.valueOf(Version.CURRENT.major) + ".0.0");
|
|
|
- protected static final Version version0 = CURRENT_VERSION;
|
|
|
+ protected static final Version version0 = Version.fromString(String.valueOf(Version.CURRENT.major) + ".0.0");
|
|
|
+ protected static final TransportVersion transportVersion0 = TransportVersion.CURRENT;
|
|
|
|
|
|
protected volatile DiscoveryNode nodeA;
|
|
|
protected volatile MockTransportService serviceA;
|
|
|
protected ClusterSettings clusterSettingsA;
|
|
|
|
|
|
- protected static final Version version1 = Version.fromId(CURRENT_VERSION.id + 1);
|
|
|
+ protected static final Version version1 = Version.fromId(version0.id + 1);
|
|
|
+ protected static final TransportVersion transportVersion1 = TransportVersion.fromId(transportVersion0.id + 1);
|
|
|
protected volatile DiscoveryNode nodeB;
|
|
|
protected volatile MockTransportService serviceB;
|
|
|
|
|
|
@@ -158,9 +159,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
|
|
|
final Settings connectionSettings = connectionSettingsBuilder.build();
|
|
|
|
|
|
- serviceA = buildService("TS_A", version0, clusterSettingsA, connectionSettings); // this one supports dynamic tracer updates
|
|
|
+ // this one supports dynamic tracer updates
|
|
|
+ serviceA = buildService("TS_A", version0, transportVersion0, clusterSettingsA, connectionSettings);
|
|
|
nodeA = serviceA.getLocalNode();
|
|
|
- serviceB = buildService("TS_B", version1, null, connectionSettings); // this one doesn't support dynamic tracer updates
|
|
|
+ // this one doesn't support dynamic tracer updates
|
|
|
+ serviceB = buildService("TS_B", version1, transportVersion1, null, connectionSettings);
|
|
|
nodeB = serviceB.getLocalNode();
|
|
|
// wait till all nodes are properly connected and the event has been sent, so tests in this class
|
|
|
// will not get this callback called on the connections done in this setup
|
|
|
@@ -190,8 +193,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
private MockTransportService buildService(
|
|
|
- final String name,
|
|
|
- final Version version,
|
|
|
+ String name,
|
|
|
+ Version version,
|
|
|
+ TransportVersion transportVersion,
|
|
|
@Nullable ClusterSettings clusterSettings,
|
|
|
Settings settings,
|
|
|
boolean acceptRequests,
|
|
|
@@ -207,7 +211,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
if (clusterSettings == null) {
|
|
|
clusterSettings = new ClusterSettings(updatedSettings, getSupportedSettings());
|
|
|
}
|
|
|
- Transport transport = build(updatedSettings, version.transportVersion, clusterSettings, doHandshake);
|
|
|
+ Transport transport = build(updatedSettings, transportVersion, clusterSettings, doHandshake);
|
|
|
MockTransportService service = MockTransportService.createNewService(
|
|
|
updatedSettings,
|
|
|
transport,
|
|
|
@@ -225,27 +229,38 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
protected MockTransportService buildService(
|
|
|
- final String name,
|
|
|
- final Version version,
|
|
|
+ String name,
|
|
|
+ Version version,
|
|
|
+ TransportVersion transportVersion,
|
|
|
@Nullable ClusterSettings clusterSettings,
|
|
|
Settings settings,
|
|
|
boolean acceptRequests,
|
|
|
boolean doHandshake
|
|
|
) {
|
|
|
- return buildService(name, version, clusterSettings, settings, acceptRequests, doHandshake, NOOP_TRANSPORT_INTERCEPTOR);
|
|
|
+ return buildService(
|
|
|
+ name,
|
|
|
+ version,
|
|
|
+ transportVersion,
|
|
|
+ clusterSettings,
|
|
|
+ settings,
|
|
|
+ acceptRequests,
|
|
|
+ doHandshake,
|
|
|
+ NOOP_TRANSPORT_INTERCEPTOR
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
- protected MockTransportService buildService(final String name, final Version version, Settings settings) {
|
|
|
- return buildService(name, version, null, settings);
|
|
|
+ protected MockTransportService buildService(String name, Version version, TransportVersion transportVersion, Settings settings) {
|
|
|
+ return buildService(name, version, transportVersion, null, settings);
|
|
|
}
|
|
|
|
|
|
protected MockTransportService buildService(
|
|
|
- final String name,
|
|
|
- final Version version,
|
|
|
+ String name,
|
|
|
+ Version version,
|
|
|
+ TransportVersion transportVersion,
|
|
|
ClusterSettings clusterSettings,
|
|
|
Settings settings
|
|
|
) {
|
|
|
- return buildService(name, version, clusterSettings, settings, true, true);
|
|
|
+ return buildService(name, version, transportVersion, clusterSettings, settings, true, true);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -598,7 +613,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testVoidMessageCompressed() throws Exception {
|
|
|
- try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY)) {
|
|
|
+ try (MockTransportService serviceC = buildService("TS_C", version0, transportVersion0, Settings.EMPTY)) {
|
|
|
serviceA.registerRequestHandler(
|
|
|
"internal:sayHello",
|
|
|
ThreadPool.Names.GENERIC,
|
|
|
@@ -654,7 +669,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testHelloWorldCompressed() throws Exception {
|
|
|
- try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY)) {
|
|
|
+ try (MockTransportService serviceC = buildService("TS_C", version0, transportVersion0, Settings.EMPTY)) {
|
|
|
serviceA.registerRequestHandler(
|
|
|
"internal:sayHello",
|
|
|
ThreadPool.Names.GENERIC,
|
|
|
@@ -715,7 +730,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testIndexingDataCompression() throws Exception {
|
|
|
- try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY)) {
|
|
|
+ try (MockTransportService serviceC = buildService("TS_C", version0, transportVersion0, Settings.EMPTY)) {
|
|
|
String component = "cccccccccooooooooooooooommmmmmmmmmmppppppppppprrrrrrrreeeeeeeeeessssssssiiiiiiiiiibbbbbbbbllllllllleeeeee";
|
|
|
String text = component.repeat(30);
|
|
|
TransportRequestHandler<StringMessageRequest> handler = (request, channel, task) -> {
|
|
|
@@ -964,7 +979,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
if (i % 3 == 0) {
|
|
|
// simulate restart of nodeB
|
|
|
serviceB.close();
|
|
|
- MockTransportService newService = buildService("TS_B_" + i, version1, Settings.EMPTY);
|
|
|
+ MockTransportService newService = buildService("TS_B_" + i, version1, transportVersion1, Settings.EMPTY);
|
|
|
newService.registerRequestHandler("internal:test", ThreadPool.Names.SAME, TestRequest::new, ignoringRequestHandler);
|
|
|
serviceB = newService;
|
|
|
nodeB = newService.getLocalDiscoNode();
|
|
|
@@ -1467,7 +1482,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
|
|
|
Version1Request(StreamInput in) throws IOException {
|
|
|
super(in);
|
|
|
- if (in.getTransportVersion().onOrAfter(version1.transportVersion)) {
|
|
|
+ if (in.getTransportVersion().onOrAfter(transportVersion1)) {
|
|
|
value2 = in.readInt();
|
|
|
}
|
|
|
}
|
|
|
@@ -1475,7 +1490,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
@Override
|
|
|
public void writeTo(StreamOutput out) throws IOException {
|
|
|
super.writeTo(out);
|
|
|
- if (out.getTransportVersion().onOrAfter(version1.transportVersion)) {
|
|
|
+ if (out.getTransportVersion().onOrAfter(transportVersion1)) {
|
|
|
out.writeInt(value2);
|
|
|
}
|
|
|
}
|
|
|
@@ -1510,7 +1525,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
|
|
|
Version1Response(StreamInput in) throws IOException {
|
|
|
super(in);
|
|
|
- if (in.getTransportVersion().onOrAfter(version1.transportVersion)) {
|
|
|
+ if (in.getTransportVersion().onOrAfter(transportVersion1)) {
|
|
|
value2 = in.readInt();
|
|
|
} else {
|
|
|
value2 = 0;
|
|
|
@@ -1520,7 +1535,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
@Override
|
|
|
public void writeTo(StreamOutput out) throws IOException {
|
|
|
super.writeTo(out);
|
|
|
- if (out.getTransportVersion().onOrAfter(version1.transportVersion)) {
|
|
|
+ if (out.getTransportVersion().onOrAfter(transportVersion1)) {
|
|
|
out.writeInt(value2);
|
|
|
}
|
|
|
}
|
|
|
@@ -1532,7 +1547,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
assertThat(request.value2, equalTo(0)); // not set, coming from service A
|
|
|
Version1Response response = new Version1Response(1, 2);
|
|
|
channel.sendResponse(response);
|
|
|
- assertEquals(version0.transportVersion, channel.getVersion());
|
|
|
+ assertEquals(transportVersion0, channel.getVersion());
|
|
|
});
|
|
|
|
|
|
Version0Request version0Request = new Version0Request();
|
|
|
@@ -1569,7 +1584,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
assertThat(request.value1, equalTo(1));
|
|
|
Version0Response response = new Version0Response(1);
|
|
|
channel.sendResponse(response);
|
|
|
- assertEquals(version0.transportVersion, channel.getVersion());
|
|
|
+ assertEquals(transportVersion0, channel.getVersion());
|
|
|
});
|
|
|
|
|
|
Version1Request version1Request = new Version1Request();
|
|
|
@@ -1832,7 +1847,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testRejectEarlyIncomingRequests() throws Exception {
|
|
|
- try (TransportService service = buildService("TS_TEST", version0, null, Settings.EMPTY, false, false)) {
|
|
|
+ try (TransportService service = buildService("TS_TEST", version0, transportVersion0, null, Settings.EMPTY, false, false)) {
|
|
|
AtomicBoolean requestProcessed = new AtomicBoolean(false);
|
|
|
service.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> {
|
|
|
requestProcessed.set(true);
|
|
|
@@ -1841,7 +1856,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
|
|
|
DiscoveryNode node = service.getLocalNode();
|
|
|
serviceA.close();
|
|
|
- serviceA = buildService("TS_A", version0, null, Settings.EMPTY, true, false);
|
|
|
+ serviceA = buildService("TS_A", version0, transportVersion0, null, Settings.EMPTY, true, false);
|
|
|
try (Transport.Connection connection = openConnection(serviceA, node, null)) {
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
serviceA.sendRequest(
|
|
|
@@ -1958,7 +1973,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testSendRandomRequests() throws InterruptedException {
|
|
|
- TransportService serviceC = buildService("TS_C", version0, Settings.EMPTY);
|
|
|
+ TransportService serviceC = buildService("TS_C", version0, transportVersion0, Settings.EMPTY);
|
|
|
DiscoveryNode nodeC = serviceC.getLocalNode();
|
|
|
|
|
|
final CountDownLatch latch = new CountDownLatch(4);
|
|
|
@@ -2175,8 +2190,15 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
|
|
|
public void testHandshakeWithIncompatVersion() {
|
|
|
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
|
|
|
- Version version = Version.fromString("2.0.0");
|
|
|
- try (MockTransportService service = buildService("TS_C", version, Settings.EMPTY)) {
|
|
|
+ TransportVersion transportVersion = TransportVersion.fromId(TransportVersion.MINIMUM_COMPATIBLE.id - 1);
|
|
|
+ try (
|
|
|
+ MockTransportService service = buildService(
|
|
|
+ "TS_C",
|
|
|
+ Version.CURRENT.minimumCompatibilityVersion(),
|
|
|
+ transportVersion,
|
|
|
+ Settings.EMPTY
|
|
|
+ )
|
|
|
+ ) {
|
|
|
TransportAddress address = service.boundAddress().publishAddress();
|
|
|
DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", address, emptyMap(), emptySet(), version0);
|
|
|
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
|
|
@@ -2194,8 +2216,19 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
|
|
|
public void testHandshakeUpdatesVersion() throws IOException {
|
|
|
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
|
|
|
- Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
|
|
|
- try (MockTransportService service = buildService("TS_C", version, Settings.EMPTY)) {
|
|
|
+ TransportVersion transportVersion = TransportVersionUtils.randomVersionBetween(
|
|
|
+ random(),
|
|
|
+ TransportVersion.MINIMUM_COMPATIBLE,
|
|
|
+ TransportVersion.CURRENT
|
|
|
+ );
|
|
|
+ try (
|
|
|
+ MockTransportService service = buildService(
|
|
|
+ "TS_C",
|
|
|
+ Version.CURRENT.minimumCompatibilityVersion(),
|
|
|
+ transportVersion,
|
|
|
+ Settings.EMPTY
|
|
|
+ )
|
|
|
+ ) {
|
|
|
TransportAddress address = service.boundAddress().publishAddress();
|
|
|
DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", address, emptyMap(), emptySet(), Version.fromString("2.0.0"));
|
|
|
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
|
|
@@ -2208,7 +2241,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
TransportRequestOptions.Type.STATE
|
|
|
);
|
|
|
try (Transport.Connection connection = openConnection(serviceA, node, builder.build())) {
|
|
|
- assertEquals(version.transportVersion, connection.getTransportVersion());
|
|
|
+ assertEquals(transportVersion, connection.getTransportVersion());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -2220,7 +2253,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
|
|
|
ConnectionProfile connectionProfile = new ConnectionProfile.Builder(defaultProfile).setPingInterval(TimeValue.timeValueMillis(50))
|
|
|
.build();
|
|
|
- try (TransportService service = buildService("TS_TPC", Version.CURRENT, Settings.EMPTY)) {
|
|
|
+ try (TransportService service = buildService("TS_TPC", Version.CURRENT, TransportVersion.CURRENT, Settings.EMPTY)) {
|
|
|
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
|
|
|
DiscoveryNode node = new DiscoveryNode(
|
|
|
"TS_TPC",
|
|
|
@@ -2241,7 +2274,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
public void testTcpHandshake() {
|
|
|
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
|
|
|
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
|
|
|
- try (TransportService service = buildService("TS_TPC", Version.CURRENT, Settings.EMPTY)) {
|
|
|
+ try (TransportService service = buildService("TS_TPC", Version.CURRENT, TransportVersion.CURRENT, Settings.EMPTY)) {
|
|
|
DiscoveryNode node = new DiscoveryNode(
|
|
|
"TS_TPC",
|
|
|
"TS_TPC",
|
|
|
@@ -2396,7 +2429,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
public void testHandlerIsInvokedOnConnectionClose() throws IOException, InterruptedException {
|
|
|
List<String> executors = new ArrayList<>(ThreadPool.THREAD_POOL_TYPES.keySet());
|
|
|
CollectionUtil.timSort(executors); // makes sure it's reproducible
|
|
|
- TransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY);
|
|
|
+ TransportService serviceC = buildService("TS_C", version0, transportVersion0, Settings.EMPTY);
|
|
|
serviceC.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> {
|
|
|
// do nothing
|
|
|
});
|
|
|
@@ -2458,7 +2491,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testConcurrentDisconnectOnNonPublishedConnection() throws IOException, InterruptedException {
|
|
|
- MockTransportService serviceC = buildService("TS_C", version0, Settings.EMPTY);
|
|
|
+ MockTransportService serviceC = buildService("TS_C", version0, transportVersion0, Settings.EMPTY);
|
|
|
CountDownLatch receivedLatch = new CountDownLatch(1);
|
|
|
CountDownLatch sendResponseLatch = new CountDownLatch(1);
|
|
|
serviceC.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> {
|
|
|
@@ -2520,7 +2553,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testTransportStats() throws Exception {
|
|
|
- MockTransportService serviceC = buildService("TS_C", version0, Settings.EMPTY);
|
|
|
+ MockTransportService serviceC = buildService("TS_C", version0, transportVersion0, Settings.EMPTY);
|
|
|
CountDownLatch receivedLatch = new CountDownLatch(1);
|
|
|
CountDownLatch sendResponseLatch = new CountDownLatch(1);
|
|
|
serviceB.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> {
|
|
|
@@ -2625,7 +2658,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testTransportStatsWithException() throws Exception {
|
|
|
- MockTransportService serviceC = buildService("TS_C", version0, Settings.EMPTY);
|
|
|
+ MockTransportService serviceC = buildService("TS_C", version0, transportVersion0, Settings.EMPTY);
|
|
|
CountDownLatch receivedLatch = new CountDownLatch(1);
|
|
|
CountDownLatch sendResponseLatch = new CountDownLatch(1);
|
|
|
Exception ex = new RuntimeException("boom");
|
|
|
@@ -2711,7 +2744,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
TransportException exception = receivedException.get();
|
|
|
assertNotNull(exception);
|
|
|
BytesStreamOutput streamOutput = new BytesStreamOutput();
|
|
|
- streamOutput.setTransportVersion(version0.transportVersion);
|
|
|
+ streamOutput.setTransportVersion(transportVersion0);
|
|
|
exception.writeTo(streamOutput);
|
|
|
String failedMessage = "Unexpected read bytes size. The transport exception that was received=" + exception;
|
|
|
// 57 bytes are the non-exception message bytes that have been received. It should include the initial
|
|
|
@@ -2735,6 +2768,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
MockTransportService serviceC = buildService(
|
|
|
"TS_C",
|
|
|
version0,
|
|
|
+ transportVersion0,
|
|
|
Settings.builder()
|
|
|
.put("transport.profiles.default.bind_host", "_local:ipv4_")
|
|
|
.put("transport.profiles.some_profile.port", "8900-9000")
|
|
|
@@ -2930,14 +2964,14 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
.build();
|
|
|
BindTransportException bindTransportException = expectThrows(
|
|
|
BindTransportException.class,
|
|
|
- () -> buildService("test", Version.CURRENT, settings)
|
|
|
+ () -> buildService("test", Version.CURRENT, TransportVersion.CURRENT, settings)
|
|
|
);
|
|
|
InetSocketAddress inetSocketAddress = serviceA.boundAddress().publishAddress().address();
|
|
|
assertEquals("Failed to bind to " + NetworkAddress.format(inetSocketAddress), bindTransportException.getMessage());
|
|
|
}
|
|
|
|
|
|
public void testChannelCloseWhileConnecting() {
|
|
|
- try (MockTransportService service = buildService("TS_C", version0, Settings.EMPTY)) {
|
|
|
+ try (MockTransportService service = buildService("TS_C", version0, transportVersion0, Settings.EMPTY)) {
|
|
|
AtomicBoolean connectionClosedListenerCalled = new AtomicBoolean(false);
|
|
|
service.addConnectionListener(new TransportConnectionListener() {
|
|
|
@Override
|
|
|
@@ -3063,7 +3097,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
};
|
|
|
}
|
|
|
};
|
|
|
- try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, null, Settings.EMPTY, true, true, interceptor)) {
|
|
|
+ try (
|
|
|
+ MockTransportService serviceC = buildService("TS_C", version0, transportVersion0, null, Settings.EMPTY, true, true, interceptor)
|
|
|
+ ) {
|
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
serviceC.connectToNode(
|
|
|
serviceA.getLocalDiscoNode(),
|