1
0
Эх сурвалжийг харах

Polling for cluster diagnostics information (#89014)

This commit causes non-master-eligible nodes to poll a random master-eligible node every 10 seconds
whenever the elected master goes null for diagnostic information in support of the health API's master
stability check.
Keith Massey 3 жил өмнө
parent
commit
ee33383156

+ 5 - 0
docs/changelog/89014.yaml

@@ -0,0 +1,5 @@
+pr: 89014
+summary: Polling for cluster diagnostics information
+area: Health
+type: enhancement
+issues: []

+ 126 - 0
server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsServiceIT.java

@@ -14,17 +14,25 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
+import org.elasticsearch.test.disruption.NetworkDisruption;
+import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.Scheduler;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.containsString;
@@ -41,6 +49,11 @@ public class CoordinationDiagnosticsServiceIT extends ESIntegTestCase {
         internalCluster().setBootstrapMasterNodeIndex(0);
     }
 
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Collections.singletonList(MockTransportService.TestPlugin.class);
+    }
+
     public void testBlockClusterStateProcessingOnOneNode() throws Exception {
         /*
          * This test picks a node that is not elected master, and then blocks cluster state processing on it. The reason is so that we
@@ -102,6 +115,119 @@ public class CoordinationDiagnosticsServiceIT extends ESIntegTestCase {
         disruption.stopDisrupting();
     }
 
+    public void testBeginPollingRemoteStableMasterHealthIndicatorService() throws Exception {
+        /*
+         * This test picks a node that is not elected master, and then blocks cluster state processing on it. The reason is so that we
+         * can call CoordinationDiagnosticsService#beginPollingRemoteMasterStabilityDiagnostic without a cluster changed event
+         * resulting in the values we pass in being overwritten.
+         */
+        final List<String> nodeNames = internalCluster().startNodes(3);
+        ensureStableCluster(3);
+
+        final String master = internalCluster().getMasterName();
+        assertThat(nodeNames, hasItem(master));
+        String blockedNode = nodeNames.stream().filter(n -> n.equals(master) == false).findAny().get();
+        assertNotNull(blockedNode);
+
+        DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, master).state().nodes();
+        Set<DiscoveryNode> nodesWithoutBlockedNode = discoveryNodes.getNodes()
+            .values()
+            .stream()
+            .filter(n -> n.getName().equals(blockedNode) == false)
+            .collect(Collectors.toSet());
+
+        BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(blockedNode, random());
+        internalCluster().setDisruptionScheme(disruption);
+        // stop processing cluster state changes
+        disruption.startDisrupting();
+
+        CoordinationDiagnosticsService diagnosticsOnBlockedNode = internalCluster().getInstance(
+            CoordinationDiagnosticsService.class,
+            blockedNode
+        );
+        AtomicReference<CoordinationDiagnosticsService.RemoteMasterHealthResult> result = new AtomicReference<>();
+        AtomicReference<Scheduler.Cancellable> cancellable = new AtomicReference<>();
+        diagnosticsOnBlockedNode.remoteCoordinationDiagnosisResult = result;
+        diagnosticsOnBlockedNode.remoteCoordinationDiagnosisTask = cancellable;
+
+        diagnosticsOnBlockedNode.remoteRequestInitialDelay = TimeValue.ZERO;
+        diagnosticsOnBlockedNode.beginPollingRemoteMasterStabilityDiagnostic(result::set, cancellable);
+
+        // while the node is blocked from processing cluster state changes it should reach out to the other 2
+        // master eligible nodes and get a successful response
+        assertBusy(() -> {
+            assertNotNull(result.get());
+            assertNotNull(cancellable.get());
+            assertNotNull(result.get().result());
+            assertNull(result.get().remoteException());
+        });
+
+        disruption.stopDisrupting();
+    }
+
+    public void testNoQuorumSeenFromNonMasterNodes() throws Exception {
+        /*
+         * In this test we have three master-eligible nodes. We make it so that the two non-active ones cannot communicate, and then we
+         * stop the active master node. Now there is no quorum so a new master cannot be elected. We set the master lookup threshold very
+         * low on the data nodes, so when we run the master stability check on each of the master nodes, it will see that there has been no
+         * master recently because there is no quorum, so it returns a RED status. In this test we then check the value of
+         * remoteCoordinationDiagnosisResult on each of the non-master-eligible nodes to make sure that they have reached out to one of
+         * the master-eligible nodes to get the expected result.
+         */
+        final List<String> masterNodes = internalCluster().startMasterOnlyNodes(
+            3,
+            Settings.builder()
+                .put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "1s")
+                .put(Coordinator.PUBLISH_TIMEOUT_SETTING.getKey(), "1s")
+                .put(CoordinationDiagnosticsService.NO_MASTER_TRANSITIONS_THRESHOLD_SETTING.getKey(), 1)
+                .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), TimeValue.ZERO)
+                .put(CoordinationDiagnosticsService.NODE_HAS_MASTER_LOOKUP_TIMEFRAME_SETTING.getKey(), new TimeValue(1, TimeUnit.SECONDS))
+                .build()
+        );
+        final List<String> dataNodes = internalCluster().startDataOnlyNodes(
+            2,
+            Settings.builder()
+                .put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "1s")
+                .put(Coordinator.PUBLISH_TIMEOUT_SETTING.getKey(), "1s")
+                .put(CoordinationDiagnosticsService.NO_MASTER_TRANSITIONS_THRESHOLD_SETTING.getKey(), 1)
+                .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), TimeValue.ZERO)
+                .put(CoordinationDiagnosticsService.NODE_HAS_MASTER_LOOKUP_TIMEFRAME_SETTING.getKey(), new TimeValue(1, TimeUnit.SECONDS))
+                .build()
+        );
+        internalCluster().getInstances(CoordinationDiagnosticsService.class)
+            .forEach(coordinationDiagnosticsService -> coordinationDiagnosticsService.remoteRequestInitialDelay = TimeValue.ZERO);
+        ensureStableCluster(5);
+        String firstMasterNode = internalCluster().getMasterName();
+        List<String> nonActiveMasterNodes = masterNodes.stream().filter(nodeName -> firstMasterNode.equals(nodeName) == false).toList();
+        NetworkDisruption networkDisconnect = new NetworkDisruption(
+            new NetworkDisruption.TwoPartitions(
+                Set.of(nonActiveMasterNodes.get(0), dataNodes.get(0)),
+                Set.of(nonActiveMasterNodes.get(1), dataNodes.get(1))
+            ),
+            NetworkDisruption.UNRESPONSIVE
+        );
+        internalCluster().clearDisruptionScheme();
+        setDisruptionScheme(networkDisconnect);
+        networkDisconnect.startDisrupting();
+        internalCluster().stopNode(firstMasterNode);
+
+        assertBusy(() -> {
+            dataNodes.forEach(dataNode -> {
+                CoordinationDiagnosticsService diagnosticsOnBlockedNode = internalCluster().getInstance(
+                    CoordinationDiagnosticsService.class,
+                    dataNode
+                );
+                assertNotNull(diagnosticsOnBlockedNode.remoteCoordinationDiagnosisResult);
+                assertNotNull(diagnosticsOnBlockedNode.remoteCoordinationDiagnosisResult.get());
+                CoordinationDiagnosticsService.CoordinationDiagnosticsResult result =
+                    diagnosticsOnBlockedNode.remoteCoordinationDiagnosisResult.get().result();
+                assertNotNull(result);
+                assertThat(result.status(), equalTo(CoordinationDiagnosticsService.CoordinationDiagnosticsStatus.RED));
+                assertThat(result.summary(), containsString("unable to form a quorum"));
+            });
+        });
+    }
+
     public void testNoMasterElected() throws Exception {
         /*
          * This test starts up a 3-node cluster where all nodes are master eligible. It then shuts down two of the nodes and restarts one

+ 255 - 43
server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java

@@ -13,8 +13,12 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction;
+import org.elasticsearch.action.admin.cluster.coordination.CoordinationDiagnosticsAction;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -49,7 +53,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -99,6 +105,18 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
     // Non-private for testing
     volatile ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> clusterFormationResponses = null;
 
+    /*
+     * This is a reference to the task that is periodically reaching out to a master eligible node to get its CoordinationDiagnosticsResult
+     * for diagnosis. It is null when no polling is occurring.
+     * The field is accessed (reads/writes) from multiple threads, and is also reassigned on multiple threads.
+     */
+    volatile AtomicReference<Scheduler.Cancellable> remoteCoordinationDiagnosisTask = null;
+    /*
+     * This field holds the result of the task in the remoteCoordinationDiagnosisTask field above. The field is accessed
+     * (reads/writes) from multiple threads, but is only ever reassigned on a single thread (the cluster change event thread).
+     */
+    volatile AtomicReference<RemoteMasterHealthResult> remoteCoordinationDiagnosisResult = null;
+
     /**
      * This is the amount of time that we wait before scheduling a remote request to gather diagnostic information. It is not
      * user-configurable, but is non-final so that integration tests don't have to waste 10 seconds.
@@ -636,6 +654,13 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
         } else {
             cancelPollingClusterFormationInfo();
         }
+        if (clusterService.localNode().isMasterNode() == false) {
+            if (currentMaster == null) {
+                beginPollingRemoteMasterStabilityDiagnostic();
+            } else {
+                cancelPollingRemoteMasterStabilityDiagnostic();
+            }
+        }
     }
 
     /**
@@ -649,7 +674,7 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
         Map<DiscoveryNode, Scheduler.Cancellable> cancellables = new ConcurrentHashMap<>();
         /*
          * Assignment of clusterFormationInfoTasks must be done before the call to beginPollingClusterFormationInfo because it is used
-         * asynchronously by rescheduleFetchConsumer, called from beginPollingClusterFormationInfo.
+         * asynchronously by rescheduleClusterFormationFetchConsumer, called from beginPollingClusterFormationInfo.
          */
         clusterFormationInfoTasks = cancellables;
         clusterFormationResponses = responses;
@@ -676,7 +701,9 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
                     masterEligibleNode,
                     fetchClusterFormationInfo(
                         masterEligibleNode,
-                        responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellables))
+                        responseConsumer.andThen(
+                            rescheduleClusterFormationFetchConsumer(masterEligibleNode, responseConsumer, cancellables)
+                        )
                     )
                 );
             } catch (EsRejectedExecutionException e) {
@@ -690,14 +717,14 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
     }
 
     /**
-     * This wraps the responseConsumer in a Consumer that will run rescheduleFetchConsumer() after responseConsumer has
-     * completed, adding the resulting Cancellable to cancellableConsumer.
+     * This wraps the responseConsumer in a Consumer that will run rescheduleClusterFormationFetchConsumer() after responseConsumer has
+     * completed, adding the resulting Cancellable to cancellables.
      * @param masterEligibleNode The node being polled
      * @param responseConsumer The response consumer to be wrapped
      * @param cancellables The Map of Cancellables, one for each node being polled
      * @return
      */
-    private Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> rescheduleFetchConsumer(
+    private Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> rescheduleClusterFormationFetchConsumer(
         DiscoveryNode masterEligibleNode,
         Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> responseConsumer,
         Map<DiscoveryNode, Scheduler.Cancellable> cancellables
@@ -718,15 +745,18 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
                 if (cancellables.equals(clusterFormationInfoTasks)) {
                     /*
                      * As mentioned in the comment in cancelPollingClusterFormationInfo(), there is a slim possibility here that we will
-                     * add a task here for a poll that has already been cancelled. But when it completes and runs rescheduleFetchConsumer()
-                     * we will then see that clusterFormationInfoTasks does not equal cancellables, so it will not be run again.
+                     * add a task here for a poll that has already been cancelled. But when it completes and runs
+                     * rescheduleClusterFormationFetchConsumer() we will then see that clusterFormationInfoTasks does not equal
+                     * cancellables, so it will not be run again.
                      */
                     try {
                         cancellables.put(
                             masterEligibleNode,
                             fetchClusterFormationInfo(
                                 masterEligibleNode,
-                                responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellables))
+                                responseConsumer.andThen(
+                                    rescheduleClusterFormationFetchConsumer(masterEligibleNode, responseConsumer, cancellables)
+                                )
                             )
                         );
                     } catch (EsRejectedExecutionException e) {
@@ -771,58 +801,237 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
     private Scheduler.Cancellable fetchClusterFormationInfo(
         DiscoveryNode node,
         Consumer<ClusterFormationStateOrException> responseConsumer
+    ) {
+        return sendTransportRequest(
+            node,
+            responseConsumer,
+            ClusterFormationInfoAction.INSTANCE,
+            new ClusterFormationInfoAction.Request(),
+            (response, e) -> {
+                assert response != null || e != null : "a response or an exception must be provided";
+                if (response != null) {
+                    return new ClusterFormationStateOrException(response.getClusterFormationState());
+                } else {
+                    return new ClusterFormationStateOrException(e);
+                }
+            }
+        );
+    }
+
+    void beginPollingRemoteMasterStabilityDiagnostic() {
+        assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
+        AtomicReference<Scheduler.Cancellable> cancellableReference = new AtomicReference<>();
+        AtomicReference<RemoteMasterHealthResult> resultReference = new AtomicReference<>();
+        remoteCoordinationDiagnosisTask = cancellableReference;
+        remoteCoordinationDiagnosisResult = resultReference;
+        beginPollingRemoteMasterStabilityDiagnostic(resultReference::set, cancellableReference);
+    }
+
+    /**
+     * This method returns quickly, but in the background schedules to query a remote master node's cluster diagnostics in 10 seconds, and
+     * repeats doing that until cancelPollingRemoteMasterStabilityDiagnostic() is called. This method
+     * exists (rather than being just part of the beginPollingRemoteMasterStabilityDiagnostic() above) in order to facilitate
+     * unit testing.
+     * @param responseConsumer A consumer for any results produced for a node by this method
+     * @param cancellableReference The Cancellable reference to assign the current Cancellable for this polling attempt
+     */
+    // Non-private for testing
+    void beginPollingRemoteMasterStabilityDiagnostic(
+        Consumer<RemoteMasterHealthResult> responseConsumer,
+        AtomicReference<Scheduler.Cancellable> cancellableReference
+    ) {
+        DiscoveryNode masterEligibleNode = getMasterEligibleNodes().stream().findAny().orElse(null);
+        try {
+            cancellableReference.set(
+                fetchCoordinationDiagnostics(
+                    masterEligibleNode,
+                    responseConsumer.andThen(rescheduleDiagnosticsFetchConsumer(responseConsumer, cancellableReference))
+                )
+            );
+        } catch (EsRejectedExecutionException e) {
+            if (e.isExecutorShutdown()) {
+                logger.trace("Not rescheduling request for cluster coordination info because this node is being shutdown", e);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * This wraps the responseConsumer in a Consumer that will run rescheduleDiagnosticsFetchConsumer() after responseConsumer has
+     * completed, adding the resulting Cancellable to cancellableReference.
+     * @param responseConsumer The response consumer to be wrapped
+     * @param cancellableReference The Cancellable reference to assign the current Cancellable for this polling attempt
+     * @return A wrapped Consumer that will run fetchCoordinationDiagnostics()
+     */
+    private Consumer<RemoteMasterHealthResult> rescheduleDiagnosticsFetchConsumer(
+        Consumer<RemoteMasterHealthResult> responseConsumer,
+        AtomicReference<Scheduler.Cancellable> cancellableReference
+    ) {
+        return response -> {
+            /*
+             * If the cancellableReference for this poll attempt is equal to remoteCoordinationDiagnosisTask, then that means that
+             * this poll attempt is the current one. If they are not equal, that means that
+             * cancelPollingRemoteMasterStabilityDiagnostic() has been called on this poll attempt but this thread is not yet
+             * aware. So we cancel the Cancellable in cancellableReference if it is not null. Note that
+             * remoteCoordinationDiagnosisTask can be null.
+             */
+            if (cancellableReference.equals(remoteCoordinationDiagnosisTask)) {
+                /*
+                 * Because this is not synchronized with the cancelPollingRemoteMasterStabilityDiagnostic() method, there is a
+                 * slim chance that we will add a task here for a poll that has already been cancelled. But when it completes and runs
+                 * rescheduleDiagnosticsFetchConsumer() we will then see that remoteCoordinationDiagnosisTask does not equal
+                 * cancellableReference, so it will not be run again.
+                 */
+                try {
+                    DiscoveryNode masterEligibleNode = getMasterEligibleNodes().stream().findAny().orElse(null);
+                    cancellableReference.set(
+                        fetchCoordinationDiagnostics(
+                            masterEligibleNode,
+                            responseConsumer.andThen(rescheduleDiagnosticsFetchConsumer(responseConsumer, cancellableReference))
+                        )
+                    );
+                } catch (EsRejectedExecutionException e) {
+                    if (e.isExecutorShutdown()) {
+                        logger.trace("Not rescheduling request for cluster coordination info because this node is being shutdown", e);
+                    } else {
+                        throw e;
+                    }
+                }
+            } else {
+                Scheduler.Cancellable cancellable = cancellableReference.get();
+                if (cancellable != null) {
+                    cancellable.cancel();
+                }
+            }
+        };
+    }
+
+    /**
+     * This method returns quickly, but in the background schedules to query the remote masterEligibleNode's cluster diagnostics in 10
+     * seconds unless cancel() is called on the Cancellable that this method returns.
+     * @param masterEligibleNode The masterEligibleNode to poll for cluster diagnostics. This masterEligibleNode can be null in the case
+     *                           when there are not yet any master-eligible nodes known to this masterEligibleNode's PeerFinder.
+     * @param responseConsumer The consumer of the cluster diagnostics for the masterEligibleNode, or the exception encountered while
+     *                         contacting it
+     * @return A Cancellable for the task that is scheduled to fetch cluster diagnostics
+     */
+    private Scheduler.Cancellable fetchCoordinationDiagnostics(
+        @Nullable DiscoveryNode masterEligibleNode,
+        Consumer<RemoteMasterHealthResult> responseConsumer
+    ) {
+        return sendTransportRequest(
+            masterEligibleNode,
+            responseConsumer,
+            CoordinationDiagnosticsAction.INSTANCE,
+            new CoordinationDiagnosticsAction.Request(true),
+            (response, e) -> {
+                assert response != null || e != null : "a response or an exception must be provided";
+                if (response != null) {
+                    return new RemoteMasterHealthResult(masterEligibleNode, response.getCoordinationDiagnosticsResult(), null);
+                } else {
+                    return new RemoteMasterHealthResult(masterEligibleNode, null, e);
+                }
+            }
+        );
+    }
+
+    /**
+     * This method connects to masterEligibleNode and sends it a transport request for a response of type R. The response or exception
+     * are transformed into a common type T with responseToResultFunction or exceptionToResultFunction, and then consumed by
+     * responseConsumer. This method is meant to be used when there is potentially no elected master node, so it first calls
+     * connectToNode before sending the request.
+     * @param masterEligibleNode        The master eligible node to be queried, or null if we do not yet know of a master eligible node.
+     *                                  If this is null, the responseConsumer will be given a null response
+     * @param responseConsumer          The consumer of the transformed response
+     * @param transportActionType       The ActionType for the transport action
+     * @param transportActionRequest    The ActionRequest to be sent
+     * @param responseTransformationFunction A function that converts a response or exception to the response type expected by the
+     *                                       responseConsumer
+     * @return A Cancellable for the task that is scheduled to fetch the remote information
+     */
+    private <R extends ActionResponse, T> Scheduler.Cancellable sendTransportRequest(
+        @Nullable DiscoveryNode masterEligibleNode,
+        Consumer<T> responseConsumer,
+        ActionType<R> transportActionType,
+        ActionRequest transportActionRequest,
+        BiFunction<R, Exception, T> responseTransformationFunction
     ) {
         StepListener<Releasable> connectionListener = new StepListener<>();
-        StepListener<ClusterFormationInfoAction.Response> fetchClusterInfoListener = new StepListener<>();
+        StepListener<R> fetchRemoteResultListener = new StepListener<>();
         long startTime = System.nanoTime();
         connectionListener.whenComplete(releasable -> {
-            logger.trace("Opened connection to {}, making cluster coordination info request", node);
-            // If we don't get a response in 10 seconds that is a failure worth capturing on its own:
-            final TimeValue transportTimeout = TimeValue.timeValueSeconds(10);
-            transportService.sendRequest(
-                node,
-                ClusterFormationInfoAction.NAME,
-                new ClusterFormationInfoAction.Request(),
-                TransportRequestOptions.timeout(transportTimeout),
-                new ActionListenerResponseHandler<>(
-                    ActionListener.runBefore(fetchClusterInfoListener, () -> Releasables.close(releasable)),
-                    ClusterFormationInfoAction.Response::new
-                )
-            );
+            if (masterEligibleNode == null) {
+                responseConsumer.accept(null);
+            } else {
+                logger.trace("Opened connection to {}, making transport request", masterEligibleNode);
+                // If we don't get a response in 10 seconds that is a failure worth capturing on its own:
+                final TimeValue transportTimeout = TimeValue.timeValueSeconds(10);
+                transportService.sendRequest(
+                    masterEligibleNode,
+                    transportActionType.name(),
+                    transportActionRequest,
+                    TransportRequestOptions.timeout(transportTimeout),
+                    new ActionListenerResponseHandler<>(
+                        ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)),
+                        transportActionType.getResponseReader()
+                    )
+                );
+            }
         }, e -> {
-            logger.warn("Exception connecting to master node", e);
-            responseConsumer.accept(new ClusterFormationStateOrException(e));
+            logger.warn("Exception connecting to master masterEligibleNode", e);
+            responseConsumer.accept(responseTransformationFunction.apply(null, e));
         });
 
-        fetchClusterInfoListener.whenComplete(response -> {
+        fetchRemoteResultListener.whenComplete(response -> {
             long endTime = System.nanoTime();
-            logger.trace("Received cluster coordination info from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime));
-            responseConsumer.accept(new ClusterFormationStateOrException(response.getClusterFormationState()));
+            logger.trace("Received remote response from {} in {}", masterEligibleNode, TimeValue.timeValueNanos(endTime - startTime));
+            responseConsumer.accept(responseTransformationFunction.apply(response, null));
         }, e -> {
-            logger.warn("Exception in cluster coordination info request to master node", e);
-            responseConsumer.accept(new ClusterFormationStateOrException(e));
+            logger.warn("Exception in remote request to master masterEligibleNode", e);
+            responseConsumer.accept(responseTransformationFunction.apply(null, e));
         });
 
         return transportService.getThreadPool().schedule(() -> {
-            Version minSupportedVersion = Version.V_8_4_0;
-            if (node.getVersion().onOrAfter(minSupportedVersion) == false) { // This was introduced in 8.4.0
-                logger.trace(
-                    "Cannot get cluster coordination info for {} because it is at version {} and {} is required",
-                    node,
-                    node.getVersion(),
-                    minSupportedVersion
-                );
+            if (masterEligibleNode == null) {
+                /*
+                 * This node's PeerFinder hasn't yet discovered the master-eligible nodes. By notifying the responseConsumer with a null
+                 * value we effectively do nothing, and allow this request to be recheduled.
+                 */
+                responseConsumer.accept(null);
             } else {
-                transportService.connectToNode(
-                    // Note: This connection must be explicitly closed in the connectionListener
-                    node,
-                    ConnectionProfile.buildDefaultConnectionProfile(clusterService.getSettings()),
-                    connectionListener
-                );
+                Version minSupportedVersion = Version.V_8_4_0;
+                if (masterEligibleNode.getVersion().onOrAfter(minSupportedVersion) == false) {
+                    logger.trace(
+                        "Cannot get remote result from {} because it is at version {} and {} is required",
+                        masterEligibleNode,
+                        masterEligibleNode.getVersion(),
+                        minSupportedVersion
+                    );
+                } else {
+                    transportService.connectToNode(
+                        // Note: This connection must be explicitly closed in the connectionListener
+                        masterEligibleNode,
+                        ConnectionProfile.buildDefaultConnectionProfile(clusterService.getSettings()),
+                        connectionListener
+                    );
+                }
             }
         }, remoteRequestInitialDelay, ThreadPool.Names.SAME);
     }
 
+    void cancelPollingRemoteMasterStabilityDiagnostic() {
+        assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
+        if (remoteCoordinationDiagnosisTask != null) {
+            Scheduler.Cancellable task = remoteCoordinationDiagnosisTask.get();
+            if (task != null) {
+                task.cancel();
+            }
+            remoteCoordinationDiagnosisResult = null;
+            remoteCoordinationDiagnosisTask = null;
+        }
+    }
+
     // Non-private for testing
     record ClusterFormationStateOrException(
         ClusterFormationFailureHelper.ClusterFormationState clusterFormationState,
@@ -975,4 +1184,7 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
         }
 
     }
+
+    // Non-private for testing:
+    record RemoteMasterHealthResult(DiscoveryNode node, CoordinationDiagnosticsResult result, Exception remoteException) {}
 }

+ 85 - 0
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsServiceTests.java

@@ -38,10 +38,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.EXTREME_DELAY_VARIABILITY;
@@ -979,6 +981,89 @@ public class CoordinationDiagnosticsServiceTests extends AbstractCoordinatorTest
         }
     }
 
+    public void testBeginPollingRemoteMasterStabilityDiagnostic() throws Exception {
+        MasterHistoryService masterHistoryService = createMasterHistoryService();
+        var clusterService = mock(ClusterService.class);
+        when(clusterService.getSettings()).thenReturn(Settings.EMPTY);
+        when(clusterService.state()).thenReturn(nullMasterClusterState);
+        DiscoveryNode localNode = new DiscoveryNode(
+            "node4",
+            randomNodeId(),
+            buildNewFakeTransportAddress(),
+            Collections.emptyMap(),
+            Set.of(DiscoveryNodeRole.DATA_ROLE),
+            Version.CURRENT
+        );
+        when(clusterService.localNode()).thenReturn(localNode);
+        Coordinator coordinator = mock(Coordinator.class);
+        when(coordinator.getFoundPeers()).thenReturn(List.of(node1, node2, localNode));
+        DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue();
+        ThreadPool threadPool = deterministicTaskQueue.getThreadPool();
+
+        TransportService transportService = mock(TransportService.class);
+        when(transportService.getThreadPool()).thenReturn(threadPool);
+        CoordinationDiagnosticsService coordinationDiagnosticsService = new CoordinationDiagnosticsService(
+            clusterService,
+            transportService,
+            coordinator,
+            masterHistoryService
+        );
+
+        coordinationDiagnosticsService.beginPollingRemoteMasterStabilityDiagnostic();
+        assertNotNull(coordinationDiagnosticsService.remoteCoordinationDiagnosisTask);
+        assertNotNull(coordinationDiagnosticsService.remoteCoordinationDiagnosisTask.get());
+        coordinationDiagnosticsService.cancelPollingRemoteMasterStabilityDiagnostic();
+        assertThat(coordinationDiagnosticsService.remoteCoordinationDiagnosisTask, Matchers.nullValue());
+        coordinationDiagnosticsService.clusterChanged(
+            new ClusterChangedEvent(TEST_SOURCE, nullMasterClusterState, node1MasterClusterState)
+        );
+        assertNotNull(coordinationDiagnosticsService.remoteCoordinationDiagnosisTask);
+        assertNotNull(coordinationDiagnosticsService.remoteCoordinationDiagnosisTask.get());
+        coordinationDiagnosticsService.clusterChanged(
+            new ClusterChangedEvent(TEST_SOURCE, node1MasterClusterState, nullMasterClusterState)
+        );
+        assertThat(coordinationDiagnosticsService.remoteCoordinationDiagnosisTask, Matchers.nullValue());
+        /*
+         * Note that in this test we will never find any values in remoteCoordinationDiagnosisResult because transportService is mocked out.
+         * There is not a reasonable way to plug in a transportService to this simple unit test, so testing that is left to an
+         * integration test.
+         */
+    }
+
+    public void testBeginPollingRemoteMasterStabilityDiagnosticCancel() {
+        /*
+         * This test sets up a 5-node cluster (3 master eligible). We call beginPollingRemoteMasterStabilityDiagnostic() on each
+         * non-master-eligible node. But we immediately call cancel, which is what will happen in practice most often since usually the
+         * master becomes null and then is immediately non-null when a new master is elected. This means that polling will not be started
+         *  since there is a 10-second delay, and we expect no results.
+         */
+        try (Cluster cluster = new Cluster(3, true, Settings.EMPTY)) {
+            createAndAddNonMasterNode(cluster);
+            createAndAddNonMasterNode(cluster);
+            cluster.runRandomly();
+            cluster.stabilise();
+            List<DiscoveryNode> masterNodes = cluster.clusterNodes.stream()
+                .map(Cluster.ClusterNode::getLocalNode)
+                .filter(DiscoveryNode::isMasterNode)
+                .toList();
+            cluster.clusterNodes.stream().filter(node -> node.getLocalNode().isMasterNode() == false).forEach(node -> {
+                List<CoordinationDiagnosticsService.RemoteMasterHealthResult> healthResults = new ArrayList<>();
+                AtomicReference<Scheduler.Cancellable> cancellableReference = new AtomicReference<>();
+                node.coordinationDiagnosticsService.beginPollingRemoteMasterStabilityDiagnostic(healthResults::add, cancellableReference);
+                cancellableReference.get().cancel();
+                cluster.runRandomly(false, true, EXTREME_DELAY_VARIABILITY);
+                cluster.stabilise();
+
+                /*
+                 * The cluster has now run normally for some period of time, but cancel() was called before polling began, so we expect
+                 * no results:
+                 */
+                assertThat(healthResults.size(), equalTo(0));
+            });
+
+        }
+    }
+
     public void testResultSerialization() {
         CoordinationDiagnosticsService.CoordinationDiagnosticsStatus status = getRandomStatus();
         CoordinationDiagnosticsService.CoordinationDiagnosticsDetails details = getRandomDetails();

+ 14 - 7
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -16,6 +16,7 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction;
+import org.elasticsearch.action.admin.cluster.coordination.CoordinationDiagnosticsAction;
 import org.elasticsearch.action.admin.cluster.coordination.MasterHistoryAction;
 import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
 import org.elasticsearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
@@ -1251,6 +1252,12 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                     getElectionStrategy(),
                     nodeHealthService
                 );
+                coordinationDiagnosticsService = new CoordinationDiagnosticsService(
+                    clusterService,
+                    transportService,
+                    coordinator,
+                    masterHistoryService
+                );
                 client.initialize(
                     Map.of(
                         NodesHotThreadsAction.INSTANCE,
@@ -1258,7 +1265,13 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                         MasterHistoryAction.INSTANCE,
                         new MasterHistoryAction.TransportAction(transportService, new ActionFilters(Set.of()), masterHistoryService),
                         ClusterFormationInfoAction.INSTANCE,
-                        new ClusterFormationInfoAction.TransportAction(transportService, new ActionFilters(Set.of()), coordinator)
+                        new ClusterFormationInfoAction.TransportAction(transportService, new ActionFilters(Set.of()), coordinator),
+                        CoordinationDiagnosticsAction.INSTANCE,
+                        new CoordinationDiagnosticsAction.TransportAction(
+                            transportService,
+                            new ActionFilters(Set.of()),
+                            coordinationDiagnosticsService
+                        )
                     ),
                     transportService.getTaskManager(),
                     localNode::getId,
@@ -1266,12 +1279,6 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                     null,
                     getNamedWriteableRegistry()
                 );
-                coordinationDiagnosticsService = new CoordinationDiagnosticsService(
-                    clusterService,
-                    transportService,
-                    coordinator,
-                    masterHistoryService
-                );
                 stableMasterHealthIndicatorService = new StableMasterHealthIndicatorService(coordinationDiagnosticsService);
                 masterService.setClusterStatePublisher(coordinator);
                 final GatewayService gatewayService = new GatewayService(