|
@@ -26,6 +26,7 @@ import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.cluster.ClusterName;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.io.stream.Writeable;
|
|
@@ -40,6 +41,7 @@ import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@@ -49,6 +51,7 @@ import java.util.function.Supplier;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
+import static org.elasticsearch.common.settings.Setting.boolSetting;
|
|
|
import static org.elasticsearch.common.settings.Setting.intSetting;
|
|
|
|
|
|
public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
|
|
@@ -76,6 +79,15 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
|
|
|
(ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE),
|
|
|
Setting.Property.Dynamic, Setting.Property.NodeScope));
|
|
|
|
|
|
+ /**
|
|
|
+ * Whether to include the hostname as a server_name attribute
|
|
|
+ */
|
|
|
+ public static final Setting.AffixSetting<Boolean> INCLUDE_SERVER_NAME = Setting.affixKeySetting(
|
|
|
+ "cluster.remote.",
|
|
|
+ "simple.include_server_name",
|
|
|
+ (ns, key) -> boolSetting(key, false, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE),
|
|
|
+ Setting.Property.Dynamic, Setting.Property.NodeScope));
|
|
|
+
|
|
|
static final int CHANNELS_PER_CONNECTION = 1;
|
|
|
|
|
|
private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3;
|
|
@@ -84,6 +96,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
|
|
|
private final int maxNumConnections;
|
|
|
private final AtomicLong counter = new AtomicLong(0);
|
|
|
private final List<String> configuredAddresses;
|
|
|
+ private final boolean includeServerName;
|
|
|
private final List<Supplier<TransportAddress>> addresses;
|
|
|
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
|
|
|
private final ConnectionProfile profile;
|
|
@@ -96,21 +109,31 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
|
|
|
transportService,
|
|
|
connectionManager,
|
|
|
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
|
|
|
- REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings));
|
|
|
+ REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings),
|
|
|
+ INCLUDE_SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
|
|
|
}
|
|
|
|
|
|
SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
|
|
|
int maxNumConnections, List<String> configuredAddresses) {
|
|
|
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses,
|
|
|
configuredAddresses.stream().map(address ->
|
|
|
- (Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()));
|
|
|
+ (Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()), false);
|
|
|
}
|
|
|
|
|
|
SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
|
|
|
- int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses) {
|
|
|
+ int maxNumConnections, List<String> configuredAddresses, boolean includeServerName) {
|
|
|
+ this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses,
|
|
|
+ configuredAddresses.stream().map(address ->
|
|
|
+ (Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()), includeServerName);
|
|
|
+ }
|
|
|
+
|
|
|
+ SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
|
|
|
+ int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses,
|
|
|
+ boolean includeServerName) {
|
|
|
super(clusterAlias, transportService, connectionManager);
|
|
|
this.maxNumConnections = maxNumConnections;
|
|
|
this.configuredAddresses = configuredAddresses;
|
|
|
+ this.includeServerName = includeServerName;
|
|
|
assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses";
|
|
|
this.addresses = addresses;
|
|
|
// TODO: Move into the ConnectionManager
|
|
@@ -207,7 +230,14 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
|
|
|
for (int i = 0; i < remaining; ++i) {
|
|
|
TransportAddress address = nextAddress(resolved);
|
|
|
String id = clusterAlias + "#" + address;
|
|
|
- DiscoveryNode node = new DiscoveryNode(id, address, Version.CURRENT.minimumCompatibilityVersion());
|
|
|
+ Map<String, String> attributes;
|
|
|
+ if (includeServerName) {
|
|
|
+ attributes = Collections.singletonMap("server_name", address.address().getHostString());
|
|
|
+ } else {
|
|
|
+ attributes = Collections.emptyMap();
|
|
|
+ }
|
|
|
+ DiscoveryNode node = new DiscoveryNode(id, address, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
|
|
|
+ Version.CURRENT.minimumCompatibilityVersion());
|
|
|
|
|
|
connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<>() {
|
|
|
@Override
|
|
@@ -243,7 +273,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
|
|
|
}
|
|
|
|
|
|
private static TransportAddress resolveAddress(String address) {
|
|
|
- return new TransportAddress(parseSeedAddress(address));
|
|
|
+ return new TransportAddress(parseConfiguredAddress(address));
|
|
|
}
|
|
|
|
|
|
private boolean addressesChanged(final List<String> oldAddresses, final List<String> newAddresses) {
|