瀏覽代碼

Modify proxy mode to support a single address (#50391)

Currently, the remote proxy connection mode uses a list setting for the
proxy address. This commit modifies this so that the setting is
proxy_address and only supports a single remote proxy address.
Tim Brooks 5 年之前
父節點
當前提交
3b8f5d9ea1

+ 9 - 9
qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/15_connection_mode_configuration.yml

@@ -14,7 +14,7 @@
           transient:
             cluster.remote.test_remote_cluster.mode: "proxy"
             cluster.remote.test_remote_cluster.node_connections: "5"
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
 
   - match: { status: 400 }
   - match: { error.root_cause.0.type: "illegal_argument_exception" }
@@ -29,7 +29,7 @@
           transient:
             cluster.remote.test_remote_cluster.mode: "proxy"
             cluster.remote.test_remote_cluster.seeds: $remote_ip
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
 
   - match: { status: 400 }
   - match: { error.root_cause.0.type: "illegal_argument_exception" }
@@ -64,12 +64,12 @@
         flat_settings: true
         body:
           transient:
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
             cluster.remote.test_remote_cluster.seeds: $remote_ip
 
   - match: { status: 400 }
   - match: { error.root_cause.0.type: "illegal_argument_exception" }
-  - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_addresses\" cannot be
+  - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_address\" cannot be
    used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=PROXY, configured=SNIFF]" }
 
 ---
@@ -87,11 +87,11 @@
           transient:
             cluster.remote.test_remote_cluster.mode: "proxy"
             cluster.remote.test_remote_cluster.proxy_socket_connections: "3"
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
 
   - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
   - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_socket_connections: "3"}
-  - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip}
+  - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}
 
   - do:
       search:
@@ -179,7 +179,7 @@
         body:
           transient:
             cluster.remote.test_remote_cluster.mode: "proxy"
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
 
   - match: { status: 400 }
   - match: { error.root_cause.0.type: "illegal_argument_exception" }
@@ -193,10 +193,10 @@
           transient:
             cluster.remote.test_remote_cluster.mode: "proxy"
             cluster.remote.test_remote_cluster.seeds: null
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
 
   - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
-  - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip}
+  - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}
 
   - do:
       search:

+ 4 - 4
qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml

@@ -70,17 +70,17 @@
             cluster.remote.test_remote_cluster.seeds: null
             cluster.remote.test_remote_cluster.node_connections: null
             cluster.remote.test_remote_cluster.proxy_socket_connections: "10"
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
 
   - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
   - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_socket_connections: "10"}
-  - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip}
+  - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}
 
   - do:
       cluster.remote_info: {}
 
   - match: { test_remote_cluster.connected: true }
-  - match: { test_remote_cluster.addresses.0: $remote_ip }
+  - match: { test_remote_cluster.address: $remote_ip }
   - gt:    { test_remote_cluster.num_sockets_connected: 0}
   - match: { test_remote_cluster.max_socket_connections: 10}
   - match: { test_remote_cluster.initial_connect_timeout: "30s" }
@@ -92,7 +92,7 @@
           transient:
             cluster.remote.test_remote_cluster.mode: null
             cluster.remote.test_remote_cluster.proxy_socket_connections: null
-            cluster.remote.test_remote_cluster.proxy_addresses: null
+            cluster.remote.test_remote_cluster.proxy_address: null
 
 ---
 "skip_unavailable is returned as part of _remote/info response":

+ 37 - 58
server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

@@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -37,18 +38,14 @@ import org.elasticsearch.common.util.concurrent.CountDown;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.common.settings.Setting.boolSetting;
@@ -57,18 +54,16 @@ import static org.elasticsearch.common.settings.Setting.intSetting;
 public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
 
     /**
-     * A list of addresses for remote cluster connections. The connections will be opened to the configured addresses in a round-robin
-     * fashion.
+     * The remote address for the proxy. The connections will be opened to the configured address.
      */
-    public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
+    public static final Setting.AffixSetting<String> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
         "cluster.remote.",
-        "proxy_addresses",
-        (ns, key) -> Setting.listSetting(key, Collections.emptyList(), s -> {
-                // validate address
-                parsePort(s);
-                return s;
-            }, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY),
-            Setting.Property.Dynamic, Setting.Property.NodeScope));
+        "proxy_address",
+        (ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY, s -> {
+                if (Strings.hasLength(s)) {
+                    parsePort(s);
+                }
+            }), Setting.Property.Dynamic, Setting.Property.NodeScope));
 
     /**
      * The maximum number of socket connections that will be established to a remote cluster. The default is 18.
@@ -95,9 +90,9 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
 
     private final int maxNumConnections;
     private final AtomicLong counter = new AtomicLong(0);
-    private final List<String> configuredAddresses;
+    private final String configuredAddress;
     private final boolean includeServerName;
-    private final List<Supplier<TransportAddress>> addresses;
+    private final Supplier<TransportAddress> address;
     private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
     private final ConnectionProfile profile;
     private final ConnectionManager.ConnectionValidator clusterNameValidator;
@@ -114,28 +109,26 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
     }
 
     ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
-                            int maxNumConnections, List<String> configuredAddresses) {
-        this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses,
-            configuredAddresses.stream().map(address ->
-                (Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()), false);
+                            int maxNumConnections, String configuredAddress) {
+        this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
+            () -> resolveAddress(configuredAddress), false);
     }
 
     ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
-                            int maxNumConnections, List<String> configuredAddresses, boolean includeServerName) {
-        this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses,
-            configuredAddresses.stream().map(address ->
-                (Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()), includeServerName);
+                            int maxNumConnections, String configuredAddress, boolean includeServerName) {
+        this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
+            () -> resolveAddress(configuredAddress), includeServerName);
     }
 
     ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
-                            int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses,
+                            int maxNumConnections, String configuredAddress, Supplier<TransportAddress> address,
                             boolean includeServerName) {
         super(clusterAlias, transportService, connectionManager);
         this.maxNumConnections = maxNumConnections;
-        this.configuredAddresses = configuredAddresses;
+        this.configuredAddress = configuredAddress;
         this.includeServerName = includeServerName;
-        assert addresses.isEmpty() == false : "Cannot use proxy connection strategy with no configured addresses";
-        this.addresses = addresses;
+        assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses";
+        this.address = address;
         // TODO: Move into the ConnectionManager
         this.profile = new ConnectionProfile.Builder()
             .addConnections(1, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING)
@@ -172,9 +165,9 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
 
     @Override
     protected boolean strategyMustBeRebuilt(Settings newSettings) {
-        List<String> addresses = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
+        String address = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
         int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
-        return numOfSockets != maxNumConnections || addressesChanged(configuredAddresses, addresses);
+        return numOfSockets != maxNumConnections || configuredAddress.equals(address) == false;
     }
 
     @Override
@@ -189,7 +182,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
 
     @Override
     public RemoteConnectionInfo.ModeInfo getModeInfo() {
-        return new ProxyModeInfo(configuredAddresses, maxNumConnections, connectionManager.size());
+        return new ProxyModeInfo(configuredAddress, maxNumConnections, connectionManager.size());
     }
 
     private void performProxyConnectionProcess(ActionListener<Void> listener) {
@@ -198,7 +191,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
 
     private void openConnections(ActionListener<Void> finished, int attemptNumber) {
         if (attemptNumber <= MAX_CONNECT_ATTEMPTS_PER_RUN) {
-            List<TransportAddress> resolved = addresses.stream().map(Supplier::get).collect(Collectors.toList());
+            TransportAddress resolved = address.get();
 
             int remaining = maxNumConnections - connectionManager.size();
             ActionListener<Void> compositeListener = new ActionListener<>() {
@@ -228,15 +221,14 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
 
 
             for (int i = 0; i < remaining; ++i) {
-                TransportAddress address = nextAddress(resolved);
-                String id = clusterAlias + "#" + address;
+                String id = clusterAlias + "#" + resolved;
                 Map<String, String> attributes;
                 if (includeServerName) {
-                    attributes = Collections.singletonMap("server_name", address.address().getHostString());
+                    attributes = Collections.singletonMap("server_name", resolved.address().getHostString());
                 } else {
                     attributes = Collections.emptyMap();
                 }
-                DiscoveryNode node = new DiscoveryNode(id, address, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
+                DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
                     Version.CURRENT.minimumCompatibilityVersion());
 
                 connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<>() {
@@ -248,7 +240,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
                     @Override
                     public void onFailure(Exception e) {
                         logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]",
-                            clusterAlias, address), e);
+                            clusterAlias, resolved), e);
                         compositeListener.onFailure(e);
                     }
                 });
@@ -276,40 +268,27 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
         return new TransportAddress(parseConfiguredAddress(address));
     }
 
-    private boolean addressesChanged(final List<String> oldAddresses, final List<String> newAddresses) {
-        if (oldAddresses.size() != newAddresses.size()) {
-            return true;
-        }
-        Set<String> oldSeeds = new HashSet<>(oldAddresses);
-        Set<String> newSeeds = new HashSet<>(newAddresses);
-        return oldSeeds.equals(newSeeds) == false;
-    }
-
     static class ProxyModeInfo implements RemoteConnectionInfo.ModeInfo {
 
-        private final List<String> addresses;
+        private final String address;
         private final int maxSocketConnections;
         private final int numSocketsConnected;
 
-        ProxyModeInfo(List<String> addresses, int maxSocketConnections, int numSocketsConnected) {
-            this.addresses = addresses;
+        ProxyModeInfo(String address, int maxSocketConnections, int numSocketsConnected) {
+            this.address = address;
             this.maxSocketConnections = maxSocketConnections;
             this.numSocketsConnected = numSocketsConnected;
         }
 
         private ProxyModeInfo(StreamInput input) throws IOException {
-            addresses = Arrays.asList(input.readStringArray());
+            address = input.readString();
             maxSocketConnections = input.readVInt();
             numSocketsConnected = input.readVInt();
         }
 
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-            builder.startArray("addresses");
-            for (String address : addresses) {
-                builder.value(address);
-            }
-            builder.endArray();
+            builder.field("address", address);
             builder.field("num_sockets_connected", numSocketsConnected);
             builder.field("max_socket_connections", maxSocketConnections);
             return builder;
@@ -317,7 +296,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
-            out.writeStringArray(addresses.toArray(new String[0]));
+            out.writeString(address);
             out.writeVInt(maxSocketConnections);
             out.writeVInt(numSocketsConnected);
         }
@@ -344,12 +323,12 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
             ProxyModeInfo otherProxy = (ProxyModeInfo) o;
             return maxSocketConnections == otherProxy.maxSocketConnections &&
                 numSocketsConnected == otherProxy.numSocketsConnected &&
-                Objects.equals(addresses, otherProxy.addresses);
+                Objects.equals(address, otherProxy.address);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(addresses, maxSocketConnections, numSocketsConnected);
+            return Objects.hash(address, maxSocketConnections, numSocketsConnected);
         }
     }
 }

+ 3 - 3
server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

@@ -26,6 +26,7 @@ import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Setting;
@@ -159,9 +160,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
             List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
             return seeds.isEmpty() == false;
         } else {
-            List<String> addresses = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias)
-                .get(settings);
-            return addresses.isEmpty() == false;
+            String address = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings);
+            return Strings.isEmpty(address) == false;
         }
     }
 

+ 54 - 90
server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java

@@ -23,7 +23,6 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.AbstractScopedSettings;
 import org.elasticsearch.common.settings.ClusterSettings;
@@ -32,19 +31,16 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.transport.MockTransportService;
-import org.elasticsearch.test.transport.StubbableTransport;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -86,11 +82,9 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
         }
     }
 
-    public void testProxyStrategyWillOpenExpectedNumberOfConnectionsToAddresses() {
-        try (MockTransportService transport1 = startTransport("node1", Version.CURRENT);
-             MockTransportService transport2 = startTransport("node2", Version.CURRENT)) {
+    public void testProxyStrategyWillOpenExpectedNumberOfConnectionsToAddress() {
+        try (MockTransportService transport1 = startTransport("node1", Version.CURRENT)) {
             TransportAddress address1 = transport1.boundAddress().publishAddress();
-            TransportAddress address2 = transport2.boundAddress().publishAddress();
 
             try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
                 localService.start();
@@ -100,16 +94,14 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 int numOfConnections = randomIntBetween(4, 8);
                 try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                      ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
-                         numOfConnections, addresses(address1, address2))) {
+                         numOfConnections, address1.toString())) {
                     assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
-                    assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
 
                     PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
                     strategy.connect(connectFuture);
                     connectFuture.actionGet();
 
                     assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
-                    assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
                     assertEquals(numOfConnections, connectionManager.size());
                     assertTrue(strategy.assertNoRunningConnections());
                 }
@@ -129,9 +121,10 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
 
                 ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
                 int numOfConnections = randomIntBetween(4, 8);
+
                 try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                      ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
-                         numOfConnections, addresses(address1, address2))) {
+                         numOfConnections, address1.toString(), alternatingResolver(address1, address2), false)) {
                     assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
                     assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
 
@@ -143,7 +136,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                     long initialConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream()
                         .filter(n -> n.getAddress().equals(address2))
                         .count();
-                    assertNotEquals(0, initialConnectionsToTransport2);
+                    assertEquals(0, initialConnectionsToTransport2);
                     assertEquals(numOfConnections, connectionManager.size());
                     assertTrue(strategy.assertNoRunningConnections());
 
@@ -151,11 +144,12 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
 
                     assertBusy(() -> {
                         assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
-                        // More connections now pointing to transport2
+                        // Connections now pointing to transport2
                         long finalConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream()
                             .filter(n -> n.getAddress().equals(address2))
                             .count();
-                        assertTrue(finalConnectionsToTransport2 > initialConnectionsToTransport2);
+                        assertNotEquals(0, finalConnectionsToTransport2);
+                        assertEquals(numOfConnections, connectionManager.size());
                         assertTrue(strategy.assertNoRunningConnections());
                     });
                 }
@@ -163,56 +157,6 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
         }
     }
 
-    public void testConnectWithSingleIncompatibleNode() {
-        Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();
-        try (MockTransportService transport1 = startTransport("compatible-node", Version.CURRENT);
-             MockTransportService transport2 = startTransport("incompatible-node", incompatibleVersion)) {
-            TransportAddress address1 = transport1.boundAddress().publishAddress();
-            TransportAddress address2 = transport2.boundAddress().publishAddress();
-
-            try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
-                localService.start();
-                localService.acceptIncomingRequests();
-
-                StubbableTransport stubbableTransport = new StubbableTransport(localService.transport);
-                ConnectionManager connectionManager = new ConnectionManager(profile, stubbableTransport);
-                AtomicInteger address1Attempts = new AtomicInteger(0);
-                AtomicInteger address2Attempts = new AtomicInteger(0);
-                stubbableTransport.setDefaultConnectBehavior((transport, discoveryNode, profile, listener) -> {
-                    if (discoveryNode.getAddress().equals(address1)) {
-                        address1Attempts.incrementAndGet();
-                        transport.openConnection(discoveryNode, profile, listener);
-                    } else if (discoveryNode.getAddress().equals(address2)) {
-                        address2Attempts.incrementAndGet();
-                        transport.openConnection(discoveryNode, profile, listener);
-                    } else {
-                        throw new AssertionError("Unexpected address");
-                    }
-                });
-                int numOfConnections = 5;
-                try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
-                     ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
-                         numOfConnections, addresses(address1, address2))) {
-                    assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
-                    assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
-
-                    PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
-                    strategy.connect(connectFuture);
-                    connectFuture.actionGet();
-
-                    assertEquals(4 ,connectionManager.size());
-                    assertEquals(4 ,connectionManager.getAllConnectedNodes().stream().map(n -> n.getAddress().equals(address1)).count());
-                    // Three attempts on first round, one attempts on second round, zero attempts on third round
-                    assertEquals(4, address1Attempts.get());
-                    // Two attempts on first round, one attempt on second round, one attempt on third round
-                    assertEquals(4, address2Attempts.get());
-                    assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
-                    assertTrue(strategy.assertNoRunningConnections());
-                }
-            }
-        }
-    }
-
     public void testConnectFailsWithIncompatibleNodes() {
         Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();
         try (MockTransportService transport1 = startTransport("incompatible-node", incompatibleVersion)) {
@@ -226,7 +170,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 int numOfConnections = randomIntBetween(4, 8);
                 try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                      ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
-                         numOfConnections, addresses(address1))) {
+                         numOfConnections, address1.toString())) {
 
                     PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
                     strategy.connect(connectFuture);
@@ -254,9 +198,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
 
                 ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
                 int numOfConnections = randomIntBetween(4, 8);
+
+                Supplier<TransportAddress> resolver = alternatingResolver(address1, address2);
                 try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                      ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
-                         numOfConnections, addresses(address1, address2))) {
+                         numOfConnections, address1.toString(), resolver, false)) {
                     assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
                     assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
 
@@ -264,12 +210,23 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                     strategy.connect(connectFuture);
                     connectFuture.actionGet();
 
-                    if (connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))) {
-                        assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
-                    } else {
-                        assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
-                    }
-                    assertTrue(strategy.assertNoRunningConnections());
+                    assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
+                    assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
+
+                    transport1.close();
+
+                    assertBusy(() -> {
+                        assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
+                        assertTrue(strategy.assertNoRunningConnections());
+
+                        long finalConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream()
+                            .filter(n -> n.getAddress().equals(address2))
+                            .count();
+
+                        // Connections not pointing to transport2 because the cluster name is different
+                        assertEquals(0, finalConnectionsToTransport2);
+                        assertEquals(0, connectionManager.size());
+                    });
                 }
             }
         }
@@ -293,7 +250,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 int numOfConnections = randomIntBetween(4, 8);
                 try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                      ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
-                         numOfConnections,  addresses(address), Collections.singletonList(addressSupplier), false)) {
+                         numOfConnections, address.toString(), addressSupplier, false)) {
                     PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
                     strategy.connect(connectFuture);
                     connectFuture.actionGet();
@@ -307,10 +264,8 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
     }
 
     public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange() {
-        try (MockTransportService transport1 = startTransport("node1", Version.CURRENT);
-             MockTransportService transport2 = startTransport("node2", Version.CURRENT)) {
-            TransportAddress address1 = transport1.boundAddress().publishAddress();
-            TransportAddress address2 = transport2.boundAddress().publishAddress();
+        try (MockTransportService remoteTransport = startTransport("node1", Version.CURRENT)) {
+            TransportAddress remoteAddress = remoteTransport.boundAddress().publishAddress();
 
             try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
                 localService.start();
@@ -320,13 +275,12 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 int numOfConnections = randomIntBetween(4, 8);
                 try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                      ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
-                         numOfConnections, addresses(address1, address2))) {
+                         numOfConnections, remoteAddress.toString())) {
                     PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
                     strategy.connect(connectFuture);
                     connectFuture.actionGet();
 
-                    assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
-                    assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
+                    assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(remoteAddress)));
                     assertEquals(numOfConnections, connectionManager.size());
                     assertTrue(strategy.assertNoRunningConnections());
 
@@ -339,18 +293,18 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
 
                     Settings noChange = Settings.builder()
                         .put(modeSetting.getKey(), "proxy")
-                        .put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray()))
+                        .put(addressesSetting.getKey(), remoteAddress.toString())
                         .put(socketConnections.getKey(), numOfConnections)
                         .build();
                     assertFalse(strategy.shouldRebuildConnection(noChange));
                     Settings addressesChanged = Settings.builder()
                         .put(modeSetting.getKey(), "proxy")
-                        .put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1).toArray()))
+                        .put(addressesSetting.getKey(), remoteAddress.toString())
                         .build();
                     assertTrue(strategy.shouldRebuildConnection(addressesChanged));
                     Settings socketsChanged = Settings.builder()
                         .put(modeSetting.getKey(), "proxy")
-                        .put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray()))
+                        .put(addressesSetting.getKey(), remoteAddress.toString())
                         .put(socketConnections.getKey(), numOfConnections + 1)
                         .build();
                     assertTrue(strategy.shouldRebuildConnection(socketsChanged));
@@ -398,14 +352,13 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ArrayList<String> addresses = new ArrayList<>();
-                addresses.add("localhost:" + address1.getPort());
+                String serverName = "localhost:" + address1.getPort();
 
                 ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
                 int numOfConnections = randomIntBetween(4, 8);
                 try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                      ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
-                         numOfConnections, addresses, true)) {
+                         numOfConnections, serverName, true)) {
                     assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
 
                     PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
@@ -422,7 +375,18 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
         }
     }
 
-    private static List<String> addresses(final TransportAddress... addresses) {
-        return Arrays.stream(addresses).map(TransportAddress::toString).collect(Collectors.toList());
+    private Supplier<TransportAddress> alternatingResolver(TransportAddress address1, TransportAddress address2) {
+        // On the first connection round, the connections will be routed to transport1. On the second
+        //connection round, the connections will be routed to transport2
+        AtomicBoolean transportSwitch = new AtomicBoolean(true);
+        return () -> {
+            if (transportSwitch.get()) {
+                transportSwitch.set(false);
+                return address1;
+            } else {
+                transportSwitch.set(true);
+                return address2;
+            }
+        };
     }
 }

+ 5 - 5
server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

@@ -356,8 +356,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
             modeInfo1 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 4, 4);
             modeInfo2 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 4, 3);
         } else {
-            modeInfo1 = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses, 18, 18);
-            modeInfo2 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 18, 17);
+            modeInfo1 = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses.get(0), 18, 18);
+            modeInfo2 = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses.get(0), 18, 17);
         }
 
         RemoteConnectionInfo stats =
@@ -404,7 +404,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
         if (sniff) {
             modeInfo = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 3, 2);
         } else {
-            modeInfo = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses, 18, 16);
+            modeInfo = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses.get(0), 18, 16);
         }
 
         RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", modeInfo, TimeValue.timeValueMinutes(30), true);
@@ -419,7 +419,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 "\"num_nodes_connected\":2,\"max_connections_per_cluster\":3,\"initial_connect_timeout\":\"30m\"," +
                 "\"skip_unavailable\":true}}", Strings.toString(builder));
         } else {
-            assertEquals("{\"test_cluster\":{\"connected\":true,\"mode\":\"proxy\",\"addresses\":[\"seed:1\",\"seed:2\"]," +
+            assertEquals("{\"test_cluster\":{\"connected\":true,\"mode\":\"proxy\",\"address\":\"seed:1\"," +
                 "\"num_sockets_connected\":16,\"max_socket_connections\":18,\"initial_connect_timeout\":\"30m\"," +
                 "\"skip_unavailable\":true}}", Strings.toString(builder));
         }
@@ -620,7 +620,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
     private static Settings buildProxySettings(String clusterAlias, List<String> addresses) {
         Settings.Builder builder = Settings.builder();
         builder.put(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).getKey(),
-            Strings.collectionToCommaDelimitedString(addresses));
+            addresses.get(0));
         builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "proxy");
         return builder.build();
     }

+ 9 - 9
x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml

@@ -14,7 +14,7 @@
           transient:
             cluster.remote.test_remote_cluster.mode: "proxy"
             cluster.remote.test_remote_cluster.node_connections: "5"
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
 
   - match: { status: 400 }
   - match: { error.root_cause.0.type: "illegal_argument_exception" }
@@ -29,7 +29,7 @@
           transient:
             cluster.remote.test_remote_cluster.mode: "proxy"
             cluster.remote.test_remote_cluster.seeds: $remote_ip
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
 
   - match: { status: 400 }
   - match: { error.root_cause.0.type: "illegal_argument_exception" }
@@ -64,12 +64,12 @@
         flat_settings: true
         body:
           transient:
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
             cluster.remote.test_remote_cluster.seeds: $remote_ip
 
   - match: { status: 400 }
   - match: { error.root_cause.0.type: "illegal_argument_exception" }
-  - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_addresses\" cannot be
+  - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_address\" cannot be
    used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=PROXY, configured=SNIFF]" }
 
 ---
@@ -87,11 +87,11 @@
           transient:
             cluster.remote.test_remote_cluster.mode: "proxy"
             cluster.remote.test_remote_cluster.proxy_socket_connections: "3"
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
 
   - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
   - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_socket_connections: "3"}
-  - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip}
+  - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}
 
   - do:
       search:
@@ -179,7 +179,7 @@
         body:
           transient:
             cluster.remote.test_remote_cluster.mode: "proxy"
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
 
   - match: { status: 400 }
   - match: { error.root_cause.0.type: "illegal_argument_exception" }
@@ -193,10 +193,10 @@
           transient:
             cluster.remote.test_remote_cluster.mode: "proxy"
             cluster.remote.test_remote_cluster.seeds: null
-            cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
+            cluster.remote.test_remote_cluster.proxy_address: $remote_ip
 
   - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
-  - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip}
+  - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}
 
   - do:
       search: