|
@@ -22,6 +22,7 @@ package org.elasticsearch.cluster.coordination;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
|
+import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
@@ -33,6 +34,7 @@ import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.threadpool.ThreadPool.Names;
|
|
|
import org.elasticsearch.transport.ConnectTransportException;
|
|
|
+import org.elasticsearch.transport.NodeDisconnectedException;
|
|
|
import org.elasticsearch.transport.TransportConnectionListener;
|
|
|
import org.elasticsearch.transport.TransportException;
|
|
|
import org.elasticsearch.transport.TransportRequest;
|
|
@@ -48,6 +50,7 @@ import java.util.Objects;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
+import java.util.function.Consumer;
|
|
|
|
|
|
/**
|
|
|
* The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are
|
|
@@ -75,20 +78,17 @@ public class LeaderChecker {
|
|
|
public static final Setting<Integer> LEADER_CHECK_RETRY_COUNT_SETTING =
|
|
|
Setting.intSetting("cluster.fault_detection.leader_check.retry_count", 3, 1, Setting.Property.NodeScope);
|
|
|
|
|
|
- private final Settings settings;
|
|
|
-
|
|
|
private final TimeValue leaderCheckInterval;
|
|
|
private final TimeValue leaderCheckTimeout;
|
|
|
private final int leaderCheckRetryCount;
|
|
|
private final TransportService transportService;
|
|
|
- private final Runnable onLeaderFailure;
|
|
|
+ private final Consumer<Exception> onLeaderFailure;
|
|
|
|
|
|
private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();
|
|
|
|
|
|
private volatile DiscoveryNodes discoveryNodes;
|
|
|
|
|
|
- public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
|
|
|
- this.settings = settings;
|
|
|
+ public LeaderChecker(final Settings settings, final TransportService transportService, final Consumer<Exception> onLeaderFailure) {
|
|
|
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
|
|
|
leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings);
|
|
|
leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings);
|
|
@@ -234,16 +234,19 @@ public class LeaderChecker {
|
|
|
}
|
|
|
|
|
|
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
|
|
|
- logger.debug(new ParameterizedMessage("leader [{}] disconnected, failing immediately", leader), exp);
|
|
|
- leaderFailed();
|
|
|
+ logger.debug(new ParameterizedMessage(
|
|
|
+ "leader [{}] disconnected during check", leader), exp);
|
|
|
+ leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
long failureCount = failureCountSinceLastSuccess.incrementAndGet();
|
|
|
if (failureCount >= leaderCheckRetryCount) {
|
|
|
- logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) so leader [{}] has failed",
|
|
|
- failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp);
|
|
|
- leaderFailed();
|
|
|
+ logger.debug(new ParameterizedMessage(
|
|
|
+ "leader [{}] has failed {} consecutive checks (limit [{}] is {}); last failure was:",
|
|
|
+ leader, failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount), exp);
|
|
|
+ leaderFailed(new ElasticsearchException(
|
|
|
+ "node [" + leader + "] failed [" + failureCount + "] consecutive checks", exp));
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -259,9 +262,19 @@ public class LeaderChecker {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- void leaderFailed() {
|
|
|
+ void leaderFailed(Exception e) {
|
|
|
if (isClosed.compareAndSet(false, true)) {
|
|
|
- transportService.getThreadPool().generic().execute(onLeaderFailure);
|
|
|
+ transportService.getThreadPool().generic().execute(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ onLeaderFailure.accept(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "notification of leader failure: " + e.getMessage();
|
|
|
+ }
|
|
|
+ });
|
|
|
} else {
|
|
|
logger.trace("already closed, not failing leader");
|
|
|
}
|
|
@@ -269,7 +282,8 @@ public class LeaderChecker {
|
|
|
|
|
|
void handleDisconnectedNode(DiscoveryNode discoveryNode) {
|
|
|
if (discoveryNode.equals(leader)) {
|
|
|
- leaderFailed();
|
|
|
+ logger.debug("leader [{}] disconnected", leader);
|
|
|
+ leaderFailed(new NodeDisconnectedException(discoveryNode, "disconnected"));
|
|
|
}
|
|
|
}
|
|
|
|