|
@@ -42,6 +42,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.SuppressForbidden;
|
|
|
+import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
@@ -717,22 +718,6 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void installNodeStatsHandler(TransportService service, DiscoveryNode...nodes) {
|
|
|
- service.registerRequestHandler(NodesInfoAction.NAME, NodesInfoRequest::new, ThreadPool.Names.SAME, false, false,
|
|
|
- (request, channel) -> {
|
|
|
- List<NodeInfo> nodeInfos = new ArrayList<>();
|
|
|
- int port = 80;
|
|
|
- for (DiscoveryNode node : nodes) {
|
|
|
- HttpInfo http = new HttpInfo(new BoundTransportAddress(new TransportAddress[]{node.getAddress()},
|
|
|
- new TransportAddress(node.getAddress().address().getAddress(), port++)), 100);
|
|
|
- nodeInfos.add(new NodeInfo(node.getVersion(), Build.CURRENT, node, null, null, null, null, null, null, http, null,
|
|
|
- null, null));
|
|
|
- }
|
|
|
- channel.sendResponse(new NodesInfoResponse(ClusterName.DEFAULT, nodeInfos, Collections.emptyList()));
|
|
|
- });
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
public void testGetConnectionInfo() throws Exception {
|
|
|
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
|
|
try (MockTransportService transport1 = startTransport("seed_node", knownNodes, Version.CURRENT);
|
|
@@ -753,34 +738,24 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|
|
service.acceptIncomingRequests();
|
|
|
int maxNumConnections = randomIntBetween(1, 5);
|
|
|
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
|
|
- seedNodes, service, maxNumConnections, n -> true)) {
|
|
|
+ seedNodes, service, maxNumConnections, n -> true)) {
|
|
|
// test no nodes connected
|
|
|
- RemoteConnectionInfo remoteConnectionInfo = assertSerialization(getRemoteConnectionInfo(connection));
|
|
|
+ RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo());
|
|
|
assertNotNull(remoteConnectionInfo);
|
|
|
assertEquals(0, remoteConnectionInfo.numNodesConnected);
|
|
|
- assertEquals(0, remoteConnectionInfo.seedNodes.size());
|
|
|
- assertEquals(0, remoteConnectionInfo.httpAddresses.size());
|
|
|
+ assertEquals(3, remoteConnectionInfo.seedNodes.size());
|
|
|
assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster);
|
|
|
assertEquals("test-cluster", remoteConnectionInfo.clusterAlias);
|
|
|
- updateSeedNodes(connection, seedNodes);
|
|
|
- expectThrows(RemoteTransportException.class, () -> getRemoteConnectionInfo(connection));
|
|
|
-
|
|
|
- for (MockTransportService s : Arrays.asList(transport1, transport2, transport3)) {
|
|
|
- installNodeStatsHandler(s, node1, node2, node3);
|
|
|
- }
|
|
|
|
|
|
- remoteConnectionInfo = getRemoteConnectionInfo(connection);
|
|
|
- remoteConnectionInfo = assertSerialization(remoteConnectionInfo);
|
|
|
+ // Connect some nodes
|
|
|
+ updateSeedNodes(connection, seedNodes);
|
|
|
+ remoteConnectionInfo = assertSerialization(connection.getConnectionInfo());
|
|
|
assertNotNull(remoteConnectionInfo);
|
|
|
assertEquals(connection.getNumNodesConnected(), remoteConnectionInfo.numNodesConnected);
|
|
|
assertEquals(Math.min(3, maxNumConnections), connection.getNumNodesConnected());
|
|
|
assertEquals(3, remoteConnectionInfo.seedNodes.size());
|
|
|
- assertEquals(remoteConnectionInfo.httpAddresses.size(), Math.min(3, maxNumConnections));
|
|
|
assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster);
|
|
|
assertEquals("test-cluster", remoteConnectionInfo.clusterAlias);
|
|
|
- for (TransportAddress address : remoteConnectionInfo.httpAddresses) {
|
|
|
- assertTrue("port range mismatch: " + address.getPort(), address.getPort() >= 80 && address.getPort() <= 90);
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -789,48 +764,41 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|
|
public void testRemoteConnectionInfo() throws IOException {
|
|
|
RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster",
|
|
|
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
|
|
- Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
|
|
4, 3, TimeValue.timeValueMinutes(30), false);
|
|
|
assertSerialization(stats);
|
|
|
|
|
|
RemoteConnectionInfo stats1 = new RemoteConnectionInfo("test_cluster",
|
|
|
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
|
|
- Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
|
|
4, 4, TimeValue.timeValueMinutes(30), true);
|
|
|
assertSerialization(stats1);
|
|
|
assertNotEquals(stats, stats1);
|
|
|
|
|
|
stats1 = new RemoteConnectionInfo("test_cluster_1",
|
|
|
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
|
|
- Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
|
|
4, 3, TimeValue.timeValueMinutes(30), false);
|
|
|
assertSerialization(stats1);
|
|
|
assertNotEquals(stats, stats1);
|
|
|
|
|
|
stats1 = new RemoteConnectionInfo("test_cluster",
|
|
|
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 15)),
|
|
|
- Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
|
|
4, 3, TimeValue.timeValueMinutes(30), false);
|
|
|
assertSerialization(stats1);
|
|
|
assertNotEquals(stats, stats1);
|
|
|
|
|
|
stats1 = new RemoteConnectionInfo("test_cluster",
|
|
|
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
|
|
- Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 87)),
|
|
|
4, 3, TimeValue.timeValueMinutes(30), true);
|
|
|
assertSerialization(stats1);
|
|
|
assertNotEquals(stats, stats1);
|
|
|
|
|
|
stats1 = new RemoteConnectionInfo("test_cluster",
|
|
|
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
|
|
- Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
|
|
4, 3, TimeValue.timeValueMinutes(325), true);
|
|
|
assertSerialization(stats1);
|
|
|
assertNotEquals(stats, stats1);
|
|
|
|
|
|
stats1 = new RemoteConnectionInfo("test_cluster",
|
|
|
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
|
|
- Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
|
|
5, 3, TimeValue.timeValueMinutes(30), false);
|
|
|
assertSerialization(stats1);
|
|
|
assertNotEquals(stats, stats1);
|
|
@@ -850,13 +818,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testRemoteConnectionInfoBwComp() throws IOException {
|
|
|
- final Version version = VersionUtils.randomVersionBetween(random(), Version.V_5_6_5, Version.V_6_0_0);
|
|
|
+ final Version version = VersionUtils.randomVersionBetween(random(),
|
|
|
+ Version.V_6_1_0, VersionUtils.getPreviousVersion(Version.V_7_0_0_alpha1));
|
|
|
RemoteConnectionInfo expected = new RemoteConnectionInfo("test_cluster",
|
|
|
Collections.singletonList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
|
|
- Collections.singletonList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
|
|
4, 4, new TimeValue(30, TimeUnit.MINUTES), false);
|
|
|
|
|
|
- String encoded = "AQQAAAAABzAuMC4wLjAAAAABAQQAAAAABzAuMC4wLjAAAABQBDwEBAx0ZXN0X2NsdXN0ZXIAAAAAAAAAAAAAAA==";
|
|
|
+ // This version was created using the serialization code in use from 6.1 but before 7.0
|
|
|
+ String encoded = "AQQAAAAABzAuMC4wLjAAAAABAQQAAAAABzAuMC4wLjAAAABQBDwEBAx0ZXN0X2NsdXN0ZXIA";
|
|
|
final byte[] data = Base64.getDecoder().decode(encoded);
|
|
|
|
|
|
try (StreamInput in = StreamInput.wrap(data)) {
|
|
@@ -879,55 +848,29 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|
|
public void testRenderConnectionInfoXContent() throws IOException {
|
|
|
RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster",
|
|
|
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1)),
|
|
|
- Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80)),
|
|
|
4, 3, TimeValue.timeValueMinutes(30), true);
|
|
|
stats = assertSerialization(stats);
|
|
|
XContentBuilder builder = XContentFactory.jsonBuilder();
|
|
|
builder.startObject();
|
|
|
stats.toXContent(builder, null);
|
|
|
builder.endObject();
|
|
|
- assertEquals("{\"test_cluster\":{\"seeds\":[\"0.0.0.0:1\"],\"http_addresses\":[\"0.0.0.0:80\"],\"connected\":true," +
|
|
|
+ assertEquals("{\"test_cluster\":{\"seeds\":[\"0.0.0.0:1\"],\"connected\":true," +
|
|
|
"\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," +
|
|
|
"\"skip_unavailable\":true}}", Strings.toString(builder));
|
|
|
|
|
|
stats = new RemoteConnectionInfo("some_other_cluster",
|
|
|
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1), new TransportAddress(TransportAddress.META_ADDRESS,2)),
|
|
|
- Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80), new TransportAddress(TransportAddress.META_ADDRESS,81)),
|
|
|
2, 0, TimeValue.timeValueSeconds(30), false);
|
|
|
stats = assertSerialization(stats);
|
|
|
builder = XContentFactory.jsonBuilder();
|
|
|
builder.startObject();
|
|
|
stats.toXContent(builder, null);
|
|
|
builder.endObject();
|
|
|
- assertEquals("{\"some_other_cluster\":{\"seeds\":[\"0.0.0.0:1\",\"0.0.0.0:2\"],\"http_addresses\":[\"0.0.0.0:80\",\"0.0.0.0:81\"],"
|
|
|
+ assertEquals("{\"some_other_cluster\":{\"seeds\":[\"0.0.0.0:1\",\"0.0.0.0:2\"],"
|
|
|
+ "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," +
|
|
|
"\"skip_unavailable\":false}}", Strings.toString(builder));
|
|
|
}
|
|
|
|
|
|
- private RemoteConnectionInfo getRemoteConnectionInfo(RemoteClusterConnection connection) throws Exception {
|
|
|
- AtomicReference<RemoteConnectionInfo> statsRef = new AtomicReference<>();
|
|
|
- AtomicReference<Exception> exceptionRef = new AtomicReference<>();
|
|
|
- CountDownLatch latch = new CountDownLatch(1);
|
|
|
- connection.getConnectionInfo(new ActionListener<RemoteConnectionInfo>() {
|
|
|
- @Override
|
|
|
- public void onResponse(RemoteConnectionInfo remoteConnectionInfo) {
|
|
|
- statsRef.set(remoteConnectionInfo);
|
|
|
- latch.countDown();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- exceptionRef.set(e);
|
|
|
- latch.countDown();
|
|
|
- }
|
|
|
- });
|
|
|
- latch.await();
|
|
|
- if (exceptionRef.get() != null) {
|
|
|
- throw exceptionRef.get();
|
|
|
- }
|
|
|
- return statsRef.get();
|
|
|
- }
|
|
|
-
|
|
|
public void testEnsureConnected() throws IOException, InterruptedException {
|
|
|
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
|
|
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
|