|
@@ -10,16 +10,29 @@ package org.elasticsearch.cluster.coordination;
|
|
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
+import org.elasticsearch.Version;
|
|
|
+import org.elasticsearch.action.ActionListener;
|
|
|
+import org.elasticsearch.action.ActionListenerResponseHandler;
|
|
|
+import org.elasticsearch.action.StepListener;
|
|
|
+import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction;
|
|
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
|
|
import org.elasticsearch.cluster.ClusterStateListener;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
+import org.elasticsearch.cluster.service.ClusterApplierService;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.io.stream.Writeable;
|
|
|
import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.core.Nullable;
|
|
|
+import org.elasticsearch.core.Releasable;
|
|
|
+import org.elasticsearch.core.Releasables;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
+import org.elasticsearch.threadpool.Scheduler;
|
|
|
+import org.elasticsearch.threadpool.ThreadPool;
|
|
|
+import org.elasticsearch.transport.ConnectionProfile;
|
|
|
+import org.elasticsearch.transport.TransportRequestOptions;
|
|
|
+import org.elasticsearch.transport.TransportService;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.io.PrintWriter;
|
|
@@ -30,7 +43,12 @@ import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
import java.util.Objects;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
+import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.function.BiConsumer;
|
|
|
+import java.util.function.Consumer;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
@@ -47,6 +65,7 @@ import java.util.stream.Collectors;
|
|
|
*/
|
|
|
public class CoordinationDiagnosticsService implements ClusterStateListener {
|
|
|
private final ClusterService clusterService;
|
|
|
+ private final TransportService transportService;
|
|
|
private final Coordinator coordinator;
|
|
|
private final MasterHistoryService masterHistoryService;
|
|
|
/**
|
|
@@ -63,6 +82,19 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
|
|
|
*/
|
|
|
private final int unacceptableIdentityChanges;
|
|
|
|
|
|
+ /*
|
|
|
+ * This is a list of tasks that are periodically reaching out to other master eligible nodes to get their ClusterFormationStates for
|
|
|
+ * diagnosis.
|
|
|
+ * The field is accessed (reads/writes) from multiple threads, but the reference itself is only ever changed on the cluster change
|
|
|
+ * event thread.
|
|
|
+ */
|
|
|
+ private volatile List<Scheduler.Cancellable> clusterFormationInfoTasks = null;
|
|
|
+ /*
|
|
|
+ * This field holds the results of the tasks in the clusterFormationInfoTasks field above. The field is accessed (reads/writes) from
|
|
|
+ * multiple threads, but the reference itself is only ever changed on the cluster change event thread.
|
|
|
+ */
|
|
|
+ private volatile ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> clusterFormationResponses = null;
|
|
|
+
|
|
|
private static final Logger logger = LogManager.getLogger(CoordinationDiagnosticsService.class);
|
|
|
|
|
|
/**
|
|
@@ -98,10 +130,12 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
|
|
|
|
|
|
public CoordinationDiagnosticsService(
|
|
|
ClusterService clusterService,
|
|
|
+ TransportService transportService,
|
|
|
Coordinator coordinator,
|
|
|
MasterHistoryService masterHistoryService
|
|
|
) {
|
|
|
this.clusterService = clusterService;
|
|
|
+ this.transportService = transportService;
|
|
|
this.coordinator = coordinator;
|
|
|
this.masterHistoryService = masterHistoryService;
|
|
|
this.nodeHasMasterLookupTimeframe = NODE_HAS_MASTER_LOOKUP_TIMEFRAME_SETTING.get(clusterService.getSettings());
|
|
@@ -410,6 +444,168 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ if (currentMaster == null && clusterService.localNode().isMasterNode()) {
|
|
|
+ /*
|
|
|
+ * This begins polling all master-eligible nodes for cluster formation information. However there's a 10-second delay before it
|
|
|
+ * starts, so in the normal situation where during a master transition it flips from master1 -> null -> master2, it the
|
|
|
+ * polling tasks will be canceled before any requests are actually made.
|
|
|
+ */
|
|
|
+ beginPollingClusterFormationInfo();
|
|
|
+ } else {
|
|
|
+ cancelPollingClusterFormationInfo();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This method begins polling all known master-eligible nodes for cluster formation information. After a 10-second initial delay, it
|
|
|
+ * polls each node every 10 seconds until cancelPollingClusterFormationInfo() is called.
|
|
|
+ */
|
|
|
+ private void beginPollingClusterFormationInfo() {
|
|
|
+ assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
|
|
|
+ cancelPollingClusterFormationInfo();
|
|
|
+ ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> responses = new ConcurrentHashMap<>();
|
|
|
+ List<Scheduler.Cancellable> cancellables = new CopyOnWriteArrayList<>();
|
|
|
+ beginPollingClusterFormationInfo(getMasterEligibleNodes(), responses::put, cancellables::add);
|
|
|
+ clusterFormationResponses = responses;
|
|
|
+ clusterFormationInfoTasks = cancellables;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This method returns quickly, but in the background schedules to query the remote node's cluster formation state in 10 seconds, and
|
|
|
+ * repeats doing that until cancel() is called on all of the Cancellable that this method inserts into cancellables. This method
|
|
|
+ * exists (rather than being just part of the beginPollingClusterFormationInfo() above) in order to facilitate unit testing.
|
|
|
+ * @param nodeResponseConsumer A consumer for any results produced for a node by this method
|
|
|
+ * @param cancellableConsumer A consumer for any Cancellable tasks produced by this method
|
|
|
+ */
|
|
|
+ // Non-private for testing
|
|
|
+ void beginPollingClusterFormationInfo(
|
|
|
+ Collection<DiscoveryNode> masterEligibleNodes,
|
|
|
+ BiConsumer<DiscoveryNode, ClusterFormationStateOrException> nodeResponseConsumer,
|
|
|
+ Consumer<Scheduler.Cancellable> cancellableConsumer
|
|
|
+ ) {
|
|
|
+ masterEligibleNodes.forEach(masterEligibleNode -> {
|
|
|
+ Consumer<ClusterFormationStateOrException> responseConsumer = result -> nodeResponseConsumer.accept(masterEligibleNode, result);
|
|
|
+ cancellableConsumer.accept(
|
|
|
+ fetchClusterFormationInfo(
|
|
|
+ masterEligibleNode,
|
|
|
+ responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer))
|
|
|
+ )
|
|
|
+ );
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This wraps the responseConsumer in a Consumer that will run rescheduleFetchConsumer() after responseConsumer has
|
|
|
+ * completed, adding the resulting Cancellable to cancellableConsumer.
|
|
|
+ * @param masterEligibleNode The node being polled
|
|
|
+ * @param responseConsumer The response consumer to be wrapped
|
|
|
+ * @param cancellableConsumer The list of Cancellables
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> rescheduleFetchConsumer(
|
|
|
+ DiscoveryNode masterEligibleNode,
|
|
|
+ Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> responseConsumer,
|
|
|
+ Consumer<Scheduler.Cancellable> cancellableConsumer
|
|
|
+ ) {
|
|
|
+ return response -> {
|
|
|
+ cancellableConsumer.accept(
|
|
|
+ fetchClusterFormationInfo(
|
|
|
+ masterEligibleNode,
|
|
|
+ responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer))
|
|
|
+ )
|
|
|
+ );
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ private void cancelPollingClusterFormationInfo() {
|
|
|
+ assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
|
|
|
+ if (clusterFormationResponses != null) {
|
|
|
+ clusterFormationInfoTasks.forEach(Scheduler.Cancellable::cancel);
|
|
|
+ clusterFormationResponses = null;
|
|
|
+ clusterFormationInfoTasks = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This method returns quickly, but in the background schedules to query the remote node's cluster formation state in 10 seconds
|
|
|
+ * unless cancel() is called on the Cancellable that this method returns.
|
|
|
+ * @param node The node to poll for cluster formation information
|
|
|
+ * @param responseConsumer The consumer of the cluster formation info for the node, or the exception encountered while contacting it
|
|
|
+ * @return A Cancellable for the task that is scheduled to fetch cluster formation information
|
|
|
+ */
|
|
|
+ private Scheduler.Cancellable fetchClusterFormationInfo(
|
|
|
+ DiscoveryNode node,
|
|
|
+ Consumer<ClusterFormationStateOrException> responseConsumer
|
|
|
+ ) {
|
|
|
+ StepListener<Releasable> connectionListener = new StepListener<>();
|
|
|
+ StepListener<ClusterFormationInfoAction.Response> fetchClusterInfoListener = new StepListener<>();
|
|
|
+ long startTime = System.nanoTime();
|
|
|
+ connectionListener.whenComplete(releasable -> {
|
|
|
+ logger.trace("Opened connection to {}, making cluster coordination info request", node);
|
|
|
+ // If we don't get a response in 10 seconds that is a failure worth capturing on its own:
|
|
|
+ final TimeValue transportTimeout = TimeValue.timeValueSeconds(10);
|
|
|
+ transportService.sendRequest(
|
|
|
+ node,
|
|
|
+ ClusterFormationInfoAction.NAME,
|
|
|
+ new ClusterFormationInfoAction.Request(),
|
|
|
+ TransportRequestOptions.timeout(transportTimeout),
|
|
|
+ new ActionListenerResponseHandler<>(
|
|
|
+ ActionListener.runBefore(fetchClusterInfoListener, () -> Releasables.close(releasable)),
|
|
|
+ ClusterFormationInfoAction.Response::new
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }, e -> {
|
|
|
+ logger.warn("Exception connecting to master node", e);
|
|
|
+ responseConsumer.accept(new ClusterFormationStateOrException(e));
|
|
|
+ });
|
|
|
+
|
|
|
+ fetchClusterInfoListener.whenComplete(response -> {
|
|
|
+ long endTime = System.nanoTime();
|
|
|
+ logger.trace("Received cluster coordination info from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime));
|
|
|
+ responseConsumer.accept(new ClusterFormationStateOrException(response.getClusterFormationState()));
|
|
|
+ }, e -> {
|
|
|
+ logger.warn("Exception in cluster coordination info request to master node", e);
|
|
|
+ responseConsumer.accept(new ClusterFormationStateOrException(e));
|
|
|
+ });
|
|
|
+
|
|
|
+ return transportService.getThreadPool().schedule(() -> {
|
|
|
+ Version minSupportedVersion = Version.V_8_4_0;
|
|
|
+ if (node.getVersion().onOrAfter(minSupportedVersion) == false) { // This was introduced in 8.4.0
|
|
|
+ logger.trace(
|
|
|
+ "Cannot get cluster coordination info for {} because it is at version {} and {} is required",
|
|
|
+ node,
|
|
|
+ node.getVersion(),
|
|
|
+ minSupportedVersion
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ transportService.connectToNode(
|
|
|
+ // Note: This connection must be explicitly closed in the connectionListener
|
|
|
+ node,
|
|
|
+ ConnectionProfile.buildDefaultConnectionProfile(clusterService.getSettings()),
|
|
|
+ connectionListener
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }, new TimeValue(10, TimeUnit.SECONDS), ThreadPool.Names.SAME);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Non-private for testing
|
|
|
+ record ClusterFormationStateOrException(
|
|
|
+ ClusterFormationFailureHelper.ClusterFormationState clusterFormationState,
|
|
|
+ Exception exception
|
|
|
+ ) {
|
|
|
+ ClusterFormationStateOrException {
|
|
|
+ if (clusterFormationState != null && exception != null) {
|
|
|
+ throw new IllegalArgumentException("Cluster formation state and exception cannot both be non-null");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ ClusterFormationStateOrException(ClusterFormationFailureHelper.ClusterFormationState clusterFormationState) {
|
|
|
+ this(clusterFormationState, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ ClusterFormationStateOrException(Exception exception) {
|
|
|
+ this(null, exception);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public record CoordinationDiagnosticsResult(
|