|
@@ -27,7 +27,6 @@ import org.elasticsearch.common.UUIDs;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.settings.ClusterSettings;
|
|
|
import org.elasticsearch.common.settings.Setting;
|
|
|
-import org.elasticsearch.common.settings.SettingUpgrader;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.transport.TransportAddress;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
@@ -41,11 +40,8 @@ import java.util.Collections;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
-import java.util.Locale;
|
|
|
import java.util.Map;
|
|
|
-import java.util.NavigableSet;
|
|
|
import java.util.Set;
|
|
|
-import java.util.TreeSet;
|
|
|
import java.util.function.Supplier;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
@@ -55,41 +51,6 @@ import java.util.stream.Stream;
|
|
|
*/
|
|
|
public abstract class RemoteClusterAware {
|
|
|
|
|
|
- static {
|
|
|
- // remove search.remote.* settings in 8.0.0
|
|
|
- // TODO https://github.com/elastic/elasticsearch/issues/38556
|
|
|
- // assert Version.CURRENT.major < 8;
|
|
|
- }
|
|
|
-
|
|
|
- public static final Setting.AffixSetting<List<String>> SEARCH_REMOTE_CLUSTERS_SEEDS =
|
|
|
- Setting.affixKeySetting(
|
|
|
- "search.remote.",
|
|
|
- "seeds",
|
|
|
- key -> Setting.listSetting(
|
|
|
- key,
|
|
|
- Collections.emptyList(),
|
|
|
- s -> {
|
|
|
- parsePort(s);
|
|
|
- return s;
|
|
|
- },
|
|
|
- Setting.Property.Deprecated,
|
|
|
- Setting.Property.Dynamic,
|
|
|
- Setting.Property.NodeScope));
|
|
|
-
|
|
|
- public static final SettingUpgrader<List<String>> SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER = new SettingUpgrader<List<String>>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public Setting<List<String>> getSetting() {
|
|
|
- return SEARCH_REMOTE_CLUSTERS_SEEDS;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String getKey(final String key) {
|
|
|
- return key.replaceFirst("^search", "cluster");
|
|
|
- }
|
|
|
-
|
|
|
- };
|
|
|
-
|
|
|
/**
|
|
|
* A list of initial seed nodes to discover eligible nodes from the remote cluster
|
|
|
*/
|
|
@@ -98,10 +59,7 @@ public abstract class RemoteClusterAware {
|
|
|
"seeds",
|
|
|
key -> Setting.listSetting(
|
|
|
key,
|
|
|
- // the default needs to be emptyList() when fallback is removed
|
|
|
- "_na_".equals(key)
|
|
|
- ? SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(key)
|
|
|
- : SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSetting(key.replaceAll("^cluster", "search")),
|
|
|
+ Collections.emptyList(),
|
|
|
s -> {
|
|
|
// validate seed address
|
|
|
parsePort(s);
|
|
@@ -113,35 +71,6 @@ public abstract class RemoteClusterAware {
|
|
|
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
|
|
|
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
|
|
|
|
|
|
- public static final Setting.AffixSetting<String> SEARCH_REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
|
|
|
- "search.remote.",
|
|
|
- "proxy",
|
|
|
- key -> Setting.simpleString(
|
|
|
- key,
|
|
|
- s -> {
|
|
|
- if (Strings.hasLength(s)) {
|
|
|
- parsePort(s);
|
|
|
- }
|
|
|
- },
|
|
|
- Setting.Property.Deprecated,
|
|
|
- Setting.Property.Dynamic,
|
|
|
- Setting.Property.NodeScope),
|
|
|
- REMOTE_CLUSTERS_SEEDS);
|
|
|
-
|
|
|
- public static final SettingUpgrader<String> SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER = new SettingUpgrader<String>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public Setting<String> getSetting() {
|
|
|
- return SEARCH_REMOTE_CLUSTERS_PROXY;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String getKey(final String key) {
|
|
|
- return key.replaceFirst("^search", "cluster");
|
|
|
- }
|
|
|
-
|
|
|
- };
|
|
|
-
|
|
|
/**
|
|
|
* A proxy address for the remote cluster.
|
|
|
*/
|
|
@@ -150,15 +79,10 @@ public abstract class RemoteClusterAware {
|
|
|
"proxy",
|
|
|
key -> Setting.simpleString(
|
|
|
key,
|
|
|
- // no default is needed when fallback is removed, use simple string which gives empty
|
|
|
- "_na_".equals(key)
|
|
|
- ? SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(key)
|
|
|
- : SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSetting(key.replaceAll("^cluster", "search")),
|
|
|
s -> {
|
|
|
if (Strings.hasLength(s)) {
|
|
|
parsePort(s);
|
|
|
}
|
|
|
- return s;
|
|
|
},
|
|
|
Setting.Property.Dynamic,
|
|
|
Setting.Property.NodeScope),
|
|
@@ -185,22 +109,8 @@ public abstract class RemoteClusterAware {
|
|
|
final Settings settings) {
|
|
|
final Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> remoteSeeds =
|
|
|
buildRemoteClustersDynamicConfig(settings, REMOTE_CLUSTERS_SEEDS);
|
|
|
- final Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> searchRemoteSeeds =
|
|
|
- buildRemoteClustersDynamicConfig(settings, SEARCH_REMOTE_CLUSTERS_SEEDS);
|
|
|
- // sort the intersection for predictable output order
|
|
|
- final NavigableSet<String> intersection =
|
|
|
- new TreeSet<>(Arrays.asList(
|
|
|
- searchRemoteSeeds.keySet().stream().filter(s -> remoteSeeds.keySet().contains(s)).sorted().toArray(String[]::new)));
|
|
|
- if (intersection.isEmpty() == false) {
|
|
|
- final String message = String.format(
|
|
|
- Locale.ROOT,
|
|
|
- "found duplicate remote cluster configurations for cluster alias%s [%s]",
|
|
|
- intersection.size() == 1 ? "" : "es",
|
|
|
- String.join(",", intersection));
|
|
|
- throw new IllegalArgumentException(message);
|
|
|
- }
|
|
|
- return Stream
|
|
|
- .concat(remoteSeeds.entrySet().stream(), searchRemoteSeeds.entrySet().stream())
|
|
|
+ return remoteSeeds.entrySet()
|
|
|
+ .stream()
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
|
|
}
|
|
|
|
|
@@ -296,11 +206,6 @@ public abstract class RemoteClusterAware {
|
|
|
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
|
|
|
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE);
|
|
|
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::updateRemoteCluster);
|
|
|
- clusterSettings.addAffixUpdateConsumer(
|
|
|
- RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY,
|
|
|
- RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS,
|
|
|
- (key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
|
|
|
- (namespace, value) -> {});
|
|
|
}
|
|
|
|
|
|
static InetSocketAddress parseSeedAddress(String remoteHost) {
|