Browse Source

Allow to enable pings for specific remote clusters (#34753)

When we connect to remote clusters, there may be a few more routers/firewalls in-between compared to when we connect to nodes in the same cluster. We've experienced cases where firewalls drop connections completely and keep-alives seem not to be enough, or they are not properly configured. With this commit we allow to enable application-level pings specifically from CCS nodes to the selected remote nodes through the new setting `cluster.remote.${clusterAlias}.transport.ping_schedule`.  The new setting is similar `transport.ping_schedule` but it does not affect intra-cluster communication, pings are only sent to specific remote cluster when specifically enabled, as they are disabled by default.

Relates to #34405
Luca Cavanna 7 years ago
parent
commit
ef5181c678

+ 9 - 0
docs/reference/modules/remote-clusters.asciidoc

@@ -152,6 +152,15 @@ PUT _cluster/settings
   by default, but they can selectively be made optional by setting this setting
   to `true`.
 
+`cluster.remote.${cluster_alias}.transport.ping_schedule`::
+
+  Sets the time interval between regular application-level ping messages that
+  are sent to ensure that transport connections to nodes belonging to remote
+  clusters are kept alive. If set to `-1`, application-level ping messages to
+  this remote cluster are not sent. If unset, application-level ping messages
+  are sent according to the global `transport.ping_schedule` setting, which
+  defaults to ``-1` meaning that pings are not sent.
+
 [float]
 [[retrieve-remote-clusters-info]]
 === Retrieving remote clusters info

+ 3 - 3
docs/reference/modules/transport.asciidoc

@@ -46,9 +46,9 @@ between all nodes. Defaults to `false`.
 
 |`transport.ping_schedule` | Schedule a regular application-level ping message
 to ensure that transport connections between nodes are kept alive. Defaults to
-`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable to
-correctly configure TCP keep-alives instead of using this feature, because TCP
-keep-alives apply to all kinds of long-lived connection and not just to
+`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable
+to correctly configure TCP keep-alives instead of using this feature, because
+TCP keep-alives apply to all kinds of long-lived connections and not just to
 transport connections.
 
 |=======================================================================

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -293,6 +293,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
                     RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE,
                     RemoteClusterService.ENABLE_REMOTE_CLUSTERS,
                     RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS,
+                    RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
                     TransportService.TRACE_LOG_EXCLUDE_SETTING,
                     TransportService.TRACE_LOG_INCLUDE_SETTING,
                     TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,

+ 9 - 6
server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

@@ -18,8 +18,8 @@
  */
 package org.elasticsearch.transport;
 
-import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -67,16 +67,15 @@ public class ConnectionManager implements Closeable {
     private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
 
     public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
-        this(settings, transport, threadPool, ConnectionProfile.buildDefaultConnectionProfile(settings));
+        this(settings, transport, threadPool, TcpTransport.PING_SCHEDULE.get(settings));
     }
 
-    public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
+    public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) {
         this.transport = transport;
         this.threadPool = threadPool;
-        this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings);
-        this.defaultProfile = defaultProfile;
+        this.pingSchedule = pingSchedule;
+        this.defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(settings);
         this.lifecycle.moveToStarted();
-
         if (pingSchedule.millis() > 0) {
             threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing());
         }
@@ -252,6 +251,10 @@ public class ConnectionManager implements Closeable {
         }
     }
 
+    TimeValue getPingSchedule() {
+        return pingSchedule;
+    }
+
     private class ScheduledPing extends AbstractLifecycleRunnable {
 
         private ScheduledPing() {

+ 4 - 9
server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

@@ -18,8 +18,6 @@
  */
 package org.elasticsearch.transport;
 
-import java.net.InetSocketAddress;
-import java.util.function.Supplier;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.util.SetOnce;
@@ -48,6 +46,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -64,6 +63,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /**
@@ -105,13 +105,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
      * @param connectionManager the connection manager to use for this remote connection
      * @param maxNumRemoteConnections the maximum number of connections to the remote cluster
      * @param nodePredicate a predicate to filter eligible remote nodes to connect to
+     * @param proxyAddress the proxy address
      */
-    RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
-            TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
-                            Predicate<DiscoveryNode> nodePredicate) {
-        this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null);
-    }
-
     RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
             TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
             Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
@@ -151,7 +146,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
             InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress);
             return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node
                 .getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
-    }
+        }
     }
 
     /**

+ 22 - 17
server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

@@ -60,6 +60,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.common.settings.Setting.boolSetting;
+import static org.elasticsearch.common.settings.Setting.timeSetting;
 
 /**
  * Basic service for accessing remote clusters via gateway nodes
@@ -166,6 +167,12 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
                             Setting.Property.NodeScope),
                     REMOTE_CLUSTERS_SEEDS);
 
+    public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
+            "cluster.remote.",
+            "transport.ping_schedule",
+            key -> timeSetting(key, TcpTransport.PING_SCHEDULE, Setting.Property.NodeScope),
+            REMOTE_CLUSTERS_SEEDS);
+
     private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
             && (node.isMasterNode() == false  || node.isDataNode() || node.isIngestNode());
 
@@ -211,10 +218,13 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
                 }
 
                 if (remote == null) { // this is a new cluster we have to add a new representation
-                    remote = new RemoteClusterConnection(settings, entry.getKey(), seedList, transportService,
-                            new ConnectionManager(settings, transportService.transport, transportService.threadPool), numRemoteConnections,
-                            getNodePredicate(settings), proxyAddress);
-                    remoteClusters.put(entry.getKey(), remote);
+                    String clusterAlias = entry.getKey();
+                    TimeValue pingSchedule = REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings);
+                    ConnectionManager connectionManager = new ConnectionManager(settings, transportService.transport,
+                        transportService.threadPool, pingSchedule);
+                    remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, connectionManager,
+                        numRemoteConnections, getNodePredicate(settings), proxyAddress);
+                    remoteClusters.put(clusterAlias, remote);
                 }
 
                 // now update the seed nodes no matter if it's new or already existing
@@ -340,31 +350,27 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
      * @throws IllegalArgumentException if the remote cluster is unknown
      */
     public Transport.Connection getConnection(DiscoveryNode node, String cluster) {
-        RemoteClusterConnection connection = remoteClusters.get(cluster);
-        if (connection == null) {
-            throw new IllegalArgumentException("no such remote cluster: " + cluster);
-        }
-        return connection.getConnection(node);
+        return getRemoteClusterConnection(cluster).getConnection(node);
     }
 
     /**
      * Ensures that the given cluster alias is connected. If the cluster is connected this operation
      * will invoke the listener immediately.
      */
-    public void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
-        RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias);
-        if (remoteClusterConnection == null) {
-            throw new IllegalArgumentException("no such remote cluster: " + clusterAlias);
-        }
-        remoteClusterConnection.ensureConnected(listener);
+    void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
+        getRemoteClusterConnection(clusterAlias).ensureConnected(listener);
     }
 
     public Transport.Connection getConnection(String cluster) {
+        return getRemoteClusterConnection(cluster).getConnection();
+    }
+
+    RemoteClusterConnection getRemoteClusterConnection(String cluster) {
         RemoteClusterConnection connection = remoteClusters.get(cluster);
         if (connection == null) {
             throw new IllegalArgumentException("no such remote cluster: " + cluster);
         }
-        return connection.getConnection();
+        return connection;
     }
 
     @Override
@@ -386,7 +392,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
         }
     }
 
-
     @Override
     protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
         updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {}));

+ 26 - 24
server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

@@ -18,9 +18,6 @@
  */
 package org.elasticsearch.transport;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Supplier;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
@@ -68,7 +65,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -79,6 +78,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
@@ -163,7 +163,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     updateSeedNodes(connection, Arrays.asList(() -> seedNode));
                     assertTrue(service.nodeConnected(seedNode));
                     assertTrue(service.nodeConnected(discoverableNode));
@@ -204,7 +204,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     updateSeedNodes(connection, Arrays.asList(() -> seedNode));
                     assertTrue(service.nodeConnected(seedNode));
                     assertTrue(service.nodeConnected(discoverableNode));
@@ -256,7 +256,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     updateSeedNodes(connection, Arrays.asList(() -> seedNode));
                     assertTrue(service.nodeConnected(seedNode));
                     assertTrue(service.nodeConnected(discoverableNode));
@@ -285,7 +285,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     updateSeedNodes(connection, seedNodes);
                     assertTrue(service.nodeConnected(seedNode));
                     assertTrue(service.nodeConnected(discoverableNode));
@@ -312,7 +312,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     updateSeedNodes(connection, Arrays.asList(() -> seedNode));
                     assertTrue(service.nodeConnected(seedNode));
                     assertTrue(service.nodeConnected(discoverableNode));
@@ -362,7 +362,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
                     Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE,
-                    n -> n.equals(rejectedNode) == false)) {
+                    n -> n.equals(rejectedNode) == false, null)) {
                     updateSeedNodes(connection, Arrays.asList(() -> seedNode));
                     if (rejectedNode.equals(seedNode)) {
                         assertFalse(service.nodeConnected(seedNode));
@@ -422,7 +422,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(() -> seedNode)));
                     assertFalse(service.nodeConnected(seedNode));
                     assertTrue(connection.assertNoRunningConnections());
@@ -485,7 +485,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     connection.addConnectedNode(seedNode);
                     for (DiscoveryNode node : knownNodes) {
                         final Transport.Connection transportConnection = connection.getConnection(node);
@@ -528,7 +528,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 CountDownLatch listenerCalled = new CountDownLatch(1);
                 AtomicReference<Exception> exceptionReference = new AtomicReference<>();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     ActionListener<Void> listener = ActionListener.wrap(x -> {
                         listenerCalled.countDown();
                         fail("expected exception");
@@ -565,7 +565,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.acceptIncomingRequests();
                 List<Supplier<DiscoveryNode>> nodes = Collections.singletonList(() -> seedNode);
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     if (randomBoolean()) {
                         updateSeedNodes(connection, nodes);
                     }
@@ -605,7 +605,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.acceptIncomingRequests();
                 List<Supplier<DiscoveryNode>> nodes = Collections.singletonList(() -> seedNode);
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     SearchRequest request = new SearchRequest("test-index");
                     Thread[] threads = new Thread[10];
                     for (int i = 0; i < threads.length; i++) {
@@ -659,7 +659,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE,
+                    n -> true, null)) {
 
                     SearchRequest request = new SearchRequest("test-index");
                     ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index")
@@ -769,7 +770,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     int numThreads = randomIntBetween(4, 10);
                     Thread[] threads = new Thread[numThreads];
                     CyclicBarrier barrier = new CyclicBarrier(numThreads);
@@ -848,7 +849,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     int numThreads = randomIntBetween(4, 10);
                     Thread[] threads = new Thread[numThreads];
                     CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
@@ -937,7 +938,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.acceptIncomingRequests();
                 int maxNumConnections = randomIntBetween(1, 5);
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    seedNodes, service, service.connectionManager(), maxNumConnections, n -> true)) {
+                    seedNodes, service, service.connectionManager(), maxNumConnections, n -> true, null)) {
                     // test no nodes connected
                     RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo());
                     assertNotNull(remoteConnectionInfo);
@@ -1084,7 +1085,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     assertFalse(service.nodeConnected(seedNode));
                     assertFalse(service.nodeConnected(discoverableNode));
                     assertTrue(connection.assertNoRunningConnections());
@@ -1133,7 +1134,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     if (randomBoolean()) {
                         updateSeedNodes(connection, Arrays.asList(() -> seedNode));
                     }
@@ -1181,7 +1182,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     final int numGetThreads = randomIntBetween(4, 10);
                     final Thread[] getThreads = new Thread[numGetThreads];
                     final int numModifyingThreads = randomIntBetween(4, 10);
@@ -1271,7 +1272,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Arrays.asList( () -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Arrays.asList( () -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     updateSeedNodes(connection, Arrays.asList(() -> seedNode));
                     assertTrue(service.nodeConnected(seedNode));
                     assertTrue(service.nodeConnected(discoverableNode));
@@ -1351,7 +1352,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.start();
                 service.acceptIncomingRequests();
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(),
+                    Integer.MAX_VALUE, n -> true, null)) {
                     connection.addConnectedNode(connectedNode);
                     for (int i = 0; i < 10; i++) {
                         //always a direct connection as the remote node is already connected
@@ -1393,7 +1395,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                     return seedNode;
                 };
                 try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
-                    Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
+                    Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) {
                     updateSeedNodes(connection, Arrays.asList(seedSupplier));
                     // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes
                     // being called again so we try to resolve the same seed node's host twice

+ 103 - 26
server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

@@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.AbstractScopedSettings;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
@@ -97,6 +98,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
         assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER));
         assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING));
         assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE));
+        assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE));
     }
 
     public void testRemoteClusterSeedSetting() {
@@ -194,12 +196,12 @@ public class RemoteClusterServiceTests extends ESTestCase {
 
     public void testGroupClusterIndices() throws IOException {
         List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
-        try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
-             MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
-            DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
-            DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode();
-            knownNodes.add(seedTransport.getLocalDiscoNode());
-            knownNodes.add(otherSeedTransport.getLocalDiscoNode());
+        try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
+             MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
+            DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode();
+            DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode();
+            knownNodes.add(cluster1Transport.getLocalDiscoNode());
+            knownNodes.add(cluster2Transport.getLocalDiscoNode());
             Collections.shuffle(knownNodes, random());
 
             try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
@@ -207,8 +209,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
                 transportService.start();
                 transportService.acceptIncomingRequests();
                 Settings.Builder builder = Settings.builder();
-                builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString());
-                builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
+                builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
+                builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
                 try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
                     assertFalse(service.isCrossClusterSearchEnabled());
                     service.initializeRemoteClusters();
@@ -239,12 +241,12 @@ public class RemoteClusterServiceTests extends ESTestCase {
 
     public void testGroupIndices() throws IOException {
         List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
-        try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
-             MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
-            DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
-            DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode();
-            knownNodes.add(seedTransport.getLocalDiscoNode());
-            knownNodes.add(otherSeedTransport.getLocalDiscoNode());
+        try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
+             MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
+            DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode();
+            DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode();
+            knownNodes.add(cluster1Transport.getLocalDiscoNode());
+            knownNodes.add(cluster2Transport.getLocalDiscoNode());
             Collections.shuffle(knownNodes, random());
 
             try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
@@ -252,8 +254,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
                 transportService.start();
                 transportService.acceptIncomingRequests();
                 Settings.Builder builder = Settings.builder();
-                builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString());
-                builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
+                builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
+                builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
                 try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
                     assertFalse(service.isCrossClusterSearchEnabled());
                     service.initializeRemoteClusters();
@@ -301,12 +303,12 @@ public class RemoteClusterServiceTests extends ESTestCase {
 
     public void testIncrementallyAddClusters() throws IOException {
         List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
-        try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
-             MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
-            DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
-            DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode();
-            knownNodes.add(seedTransport.getLocalDiscoNode());
-            knownNodes.add(otherSeedTransport.getLocalDiscoNode());
+        try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
+             MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
+            DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode();
+            DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode();
+            knownNodes.add(cluster1Transport.getLocalDiscoNode());
+            knownNodes.add(cluster2Transport.getLocalDiscoNode());
             Collections.shuffle(knownNodes, random());
 
             try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
@@ -314,16 +316,16 @@ public class RemoteClusterServiceTests extends ESTestCase {
                 transportService.start();
                 transportService.acceptIncomingRequests();
                 Settings.Builder builder = Settings.builder();
-                builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString());
-                builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
+                builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
+                builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
                 try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
                     assertFalse(service.isCrossClusterSearchEnabled());
                     service.initializeRemoteClusters();
                     assertFalse(service.isCrossClusterSearchEnabled());
-                    service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null);
+                    service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null);
                     assertTrue(service.isCrossClusterSearchEnabled());
                     assertTrue(service.isRemoteClusterRegistered("cluster_1"));
-                    service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString()), null);
+                    service.updateRemoteCluster("cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString()), null);
                     assertTrue(service.isCrossClusterSearchEnabled());
                     assertTrue(service.isRemoteClusterRegistered("cluster_1"));
                     assertTrue(service.isRemoteClusterRegistered("cluster_2"));
@@ -337,6 +339,81 @@ public class RemoteClusterServiceTests extends ESTestCase {
         }
     }
 
+    public void testDefaultPingSchedule() throws IOException {
+        List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
+        try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) {
+            DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
+            knownNodes.add(seedTransport.getLocalDiscoNode());
+            TimeValue pingSchedule;
+            Settings.Builder settingsBuilder = Settings.builder();
+            settingsBuilder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString());
+            if (randomBoolean()) {
+                pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(1, 10));
+                settingsBuilder.put(TcpTransport.PING_SCHEDULE.getKey(), pingSchedule).build();
+            } else {
+                pingSchedule = TimeValue.MINUS_ONE;
+            }
+            Settings settings = settingsBuilder.build();
+            try (MockTransportService transportService = MockTransportService.createNewService(settings,
+                Version.CURRENT, threadPool, null)) {
+                transportService.start();
+                transportService.acceptIncomingRequests();
+                try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
+                    assertFalse(service.isCrossClusterSearchEnabled());
+                    service.initializeRemoteClusters();
+                    assertTrue(service.isCrossClusterSearchEnabled());
+                    service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null);
+                    assertTrue(service.isCrossClusterSearchEnabled());
+                    assertTrue(service.isRemoteClusterRegistered("cluster_1"));
+                    RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
+                    assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getPingSchedule());
+                }
+            }
+        }
+    }
+
+    public void testCustomPingSchedule() throws IOException {
+        List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
+        try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
+             MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
+            DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode();
+            DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode();
+            knownNodes.add(cluster1Transport.getLocalDiscoNode());
+            knownNodes.add(cluster2Transport.getLocalDiscoNode());
+            Collections.shuffle(knownNodes, random());
+            Settings.Builder settingsBuilder = Settings.builder();
+            if (randomBoolean()) {
+                settingsBuilder.put(TcpTransport.PING_SCHEDULE.getKey(), TimeValue.timeValueSeconds(randomIntBetween(1, 10)));
+            }
+            Settings transportSettings = settingsBuilder.build();
+
+            try (MockTransportService transportService = MockTransportService.createNewService(transportSettings, Version.CURRENT,
+                threadPool, null)) {
+                transportService.start();
+                transportService.acceptIncomingRequests();
+                Settings.Builder builder = Settings.builder();
+                builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
+                builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
+                TimeValue pingSchedule1 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10));
+                builder.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule1);
+                TimeValue pingSchedule2 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10));
+                builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2);
+                try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
+                    assertFalse(service.isCrossClusterSearchEnabled());
+                    service.initializeRemoteClusters();
+                    assertTrue(service.isCrossClusterSearchEnabled());
+                    service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null);
+                    assertTrue(service.isCrossClusterSearchEnabled());
+                    assertTrue(service.isRemoteClusterRegistered("cluster_1"));
+                    RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1");
+                    assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getPingSchedule());
+                    RemoteClusterConnection remoteClusterConnection2 = service.getRemoteClusterConnection("cluster_2");
+                    assertEquals(pingSchedule2, remoteClusterConnection2.getConnectionManager().getPingSchedule());
+                }
+            }
+        }
+    }
+
     public void testRemoteNodeAttribute() throws IOException, InterruptedException {
         final Settings settings =
                 Settings.builder().put("cluster.remote.node.attr", "gateway").build();