Browse Source

Reconnect remote cluster when seeds are changed (#43379)

The RemoteClusterService should close the current 
RemoteClusterConnection and should build it again if 
the seeds are changed, similarly to what is done when 
the ping interval or the compression settings are changed.

Closes #37799
Tanguy Leroux 6 years ago
parent
commit
5a41b95306

+ 12 - 1
server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

@@ -183,7 +183,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
                     remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections,
                         getNodePredicate(settings), proxyAddress, connectionProfile);
                     remoteClusters.put(clusterAlias, remote);
-                } else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile)) {
+                } else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile)
+                        || seedsChanged(remote.getSeedNodes(), seedList)) {
                     // New ConnectionProfile. Must tear down existing connection
                     try {
                         IOUtils.close(remote);
@@ -421,6 +422,16 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
             || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false;
     }
 
+    private boolean seedsChanged(final List<Tuple<String, Supplier<DiscoveryNode>>> oldSeedNodes,
+                                 final List<Tuple<String, Supplier<DiscoveryNode>>> newSeedNodes) {
+        if (oldSeedNodes.size() != newSeedNodes.size()) {
+            return true;
+        }
+        Set<String> oldSeeds = oldSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet());
+        Set<String> newSeeds = newSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet());
+        return oldSeeds.equals(newSeeds) == false;
+    }
+
     /**
      * Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode}
      * function on success.

+ 76 - 0
server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

@@ -41,6 +41,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -840,6 +841,81 @@ public class RemoteClusterServiceTests extends ESTestCase {
         }
     }
 
+    public void testReconnectWhenSeedsNodesAreUpdated() throws Exception {
+        List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
+        try (MockTransportService cluster_node_0 = startTransport("cluster_node_0", knownNodes, Version.CURRENT);
+             MockTransportService cluster_node_1 = startTransport("cluster_node_1", knownNodes, Version.CURRENT)) {
+
+            final DiscoveryNode node0 = cluster_node_0.getLocalDiscoNode();
+            final DiscoveryNode node1 = cluster_node_1.getLocalDiscoNode();
+            knownNodes.add(node0);
+            knownNodes.add(node1);
+            Collections.shuffle(knownNodes, random());
+
+            try (MockTransportService transportService =
+                     MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
+                transportService.start();
+                transportService.acceptIncomingRequests();
+
+                final Settings.Builder builder = Settings.builder();
+                builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString()));
+                try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
+                    assertFalse(service.isCrossClusterSearchEnabled());
+                    service.initializeRemoteClusters();
+                    assertTrue(service.isCrossClusterSearchEnabled());
+
+                    final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test");
+                    assertTrue(firstRemoteClusterConnection.isNodeConnected(node0));
+                    assertTrue(firstRemoteClusterConnection.isNodeConnected(node1));
+                    assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected());
+                    assertFalse(firstRemoteClusterConnection.isClosed());
+
+                    final CountDownLatch firstLatch = new CountDownLatch(1);
+                    service.updateRemoteCluster(
+                        "cluster_test",
+                        Collections.singletonList(node0.getAddress().toString()), null,
+                        genericProfile("cluster_test"), connectionListener(firstLatch));
+                    firstLatch.await();
+
+                    assertTrue(service.isCrossClusterSearchEnabled());
+                    assertTrue(firstRemoteClusterConnection.isNodeConnected(node0));
+                    assertTrue(firstRemoteClusterConnection.isNodeConnected(node1));
+                    assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected());
+                    assertFalse(firstRemoteClusterConnection.isClosed());
+                    assertSame(firstRemoteClusterConnection, service.getRemoteClusterConnection("cluster_test"));
+
+                    final List<String> newSeeds = new ArrayList<>();
+                    newSeeds.add(node1.getAddress().toString());
+                    if (randomBoolean()) {
+                        newSeeds.add(node0.getAddress().toString());
+                        Collections.shuffle(newSeeds, random());
+                    }
+
+                    final CountDownLatch secondLatch = new CountDownLatch(1);
+                    service.updateRemoteCluster(
+                        "cluster_test",
+                        newSeeds, null,
+                        genericProfile("cluster_test"), connectionListener(secondLatch));
+                    secondLatch.await();
+
+                    assertTrue(service.isCrossClusterSearchEnabled());
+                    assertBusy(() -> {
+                        assertFalse(firstRemoteClusterConnection.isNodeConnected(node0));
+                        assertFalse(firstRemoteClusterConnection.isNodeConnected(node1));
+                        assertEquals(0, firstRemoteClusterConnection.getNumNodesConnected());
+                        assertTrue(firstRemoteClusterConnection.isClosed());
+                    });
+
+                    final RemoteClusterConnection secondRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test");
+                    assertTrue(secondRemoteClusterConnection.isNodeConnected(node0));
+                    assertTrue(secondRemoteClusterConnection.isNodeConnected(node1));
+                    assertEquals(2, secondRemoteClusterConnection.getNumNodesConnected());
+                    assertFalse(secondRemoteClusterConnection.isClosed());
+                }
+            }
+        }
+    }
+
     public void testRemoteClusterWithProxy() throws Exception {
         List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
         try (MockTransportService cluster_1_node0 = startTransport("cluster_1_node0", knownNodes, Version.CURRENT);