|
@@ -71,6 +71,7 @@ import org.elasticsearch.transport.TransportService;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
@@ -78,6 +79,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
|
|
|
|
@@ -100,14 +102,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|
|
Setting.intSetting("discovery.zen.max_pings_from_another_master", 3, 1, Property.NodeScope);
|
|
|
public final static Setting<Boolean> SEND_LEAVE_REQUEST_SETTING =
|
|
|
Setting.boolSetting("discovery.zen.send_leave_request", true, Property.NodeScope);
|
|
|
- public final static Setting<Boolean> MASTER_ELECTION_FILTER_CLIENT_SETTING =
|
|
|
- Setting.boolSetting("discovery.zen.master_election.filter_client", true, Property.NodeScope);
|
|
|
public final static Setting<TimeValue> MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING =
|
|
|
Setting.timeSetting("discovery.zen.master_election.wait_for_joins_timeout",
|
|
|
settings -> TimeValue.timeValueMillis(JOIN_TIMEOUT_SETTING.get(settings).millis() / 2).toString(), TimeValue.timeValueMillis(0),
|
|
|
Property.NodeScope);
|
|
|
- public final static Setting<Boolean> MASTER_ELECTION_FILTER_DATA_SETTING =
|
|
|
- Setting.boolSetting("discovery.zen.master_election.filter_data", false, Property.NodeScope);
|
|
|
+ public final static Setting<Boolean> MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING =
|
|
|
+ Setting.boolSetting("discovery.zen.master_election.ignore_non_master_pings", false, Property.NodeScope);
|
|
|
|
|
|
public static final String DISCOVERY_REJOIN_ACTION_NAME = "internal:discovery/zen/rejoin";
|
|
|
|
|
@@ -138,8 +138,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|
|
|
|
|
private final ElectMasterService electMaster;
|
|
|
|
|
|
- private final boolean masterElectionFilterClientNodes;
|
|
|
- private final boolean masterElectionFilterDataNodes;
|
|
|
+ private final boolean masterElectionIgnoreNonMasters;
|
|
|
private final TimeValue masterElectionWaitForJoinsTimeout;
|
|
|
|
|
|
private final JoinThreadControl joinThreadControl;
|
|
@@ -169,11 +168,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|
|
this.maxPingsFromAnotherMaster = MAX_PINGS_FROM_ANOTHER_MASTER_SETTING.get(settings);
|
|
|
this.sendLeaveRequest = SEND_LEAVE_REQUEST_SETTING.get(settings);
|
|
|
|
|
|
- this.masterElectionFilterClientNodes = MASTER_ELECTION_FILTER_CLIENT_SETTING.get(settings);
|
|
|
- this.masterElectionFilterDataNodes = MASTER_ELECTION_FILTER_DATA_SETTING.get(settings);
|
|
|
+ this.masterElectionIgnoreNonMasters = MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING.get(settings);
|
|
|
this.masterElectionWaitForJoinsTimeout = MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.get(settings);
|
|
|
|
|
|
- logger.debug("using ping_timeout [{}], join.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", this.pingTimeout, joinTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);
|
|
|
+ logger.debug("using ping_timeout [{}], join.timeout [{}], master_election.ignore_non_master [{}]",
|
|
|
+ this.pingTimeout, joinTimeout, masterElectionIgnoreNonMasters);
|
|
|
|
|
|
clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, this::handleMinimumMasterNodesChanged, (value) -> {
|
|
|
final ClusterState clusterState = clusterService.state();
|
|
@@ -846,30 +845,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|
|
}
|
|
|
|
|
|
// filter responses
|
|
|
- List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
|
|
|
- for (ZenPing.PingResponse pingResponse : fullPingResponses) {
|
|
|
- DiscoveryNode node = pingResponse.node();
|
|
|
- if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) {
|
|
|
- // filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)
|
|
|
- } else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) {
|
|
|
- // filter out data node that is not also master
|
|
|
- } else {
|
|
|
- pingResponses.add(pingResponse);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (logger.isDebugEnabled()) {
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- if (pingResponses.isEmpty()) {
|
|
|
- sb.append(" {none}");
|
|
|
- } else {
|
|
|
- for (ZenPing.PingResponse pingResponse : pingResponses) {
|
|
|
- sb.append("\n\t--> ").append(pingResponse);
|
|
|
- }
|
|
|
- }
|
|
|
- logger.debug("filtered ping responses: (filter_client[{}], filter_data[{}]){}", masterElectionFilterClientNodes,
|
|
|
- masterElectionFilterDataNodes, sb);
|
|
|
- }
|
|
|
+ final List<ZenPing.PingResponse> pingResponses;
|
|
|
+ pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
|
|
|
|
|
|
final DiscoveryNode localNode = clusterService.localNode();
|
|
|
List<DiscoveryNode> pingMasters = new ArrayList<>();
|
|
@@ -925,6 +902,28 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ static List<ZenPing.PingResponse> filterPingResponses(ZenPing.PingResponse[] fullPingResponses, boolean masterElectionIgnoreNonMasters, ESLogger logger) {
|
|
|
+ List<ZenPing.PingResponse> pingResponses;
|
|
|
+ if (masterElectionIgnoreNonMasters) {
|
|
|
+ pingResponses = Arrays.stream(fullPingResponses).filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList());
|
|
|
+ } else {
|
|
|
+ pingResponses = Arrays.asList(fullPingResponses);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ if (pingResponses.isEmpty()) {
|
|
|
+ sb.append(" {none}");
|
|
|
+ } else {
|
|
|
+ for (ZenPing.PingResponse pingResponse : pingResponses) {
|
|
|
+ sb.append("\n\t--> ").append(pingResponse);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ logger.debug("filtered ping responses: (ignore_non_masters [{}]){}", masterElectionIgnoreNonMasters, sb);
|
|
|
+ }
|
|
|
+ return pingResponses;
|
|
|
+ }
|
|
|
+
|
|
|
protected ClusterState rejoin(ClusterState clusterState, String reason) {
|
|
|
|
|
|
// *** called from within an cluster state update task *** //
|