1
0
Эх сурвалжийг харах

Health info overview (#89275)

This PR enables the nodes of a cluster to push their health info to the health node.
Co-authored-by: Andrei Dan <andrei.dan@elastic.co>
Mary Gouseti 3 жил өмнө
parent
commit
e63d70ba35

+ 160 - 0
server/src/internalClusterTest/java/org/elasticsearch/health/UpdateHealthInfoCacheIT.java

@@ -0,0 +1,160 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.health;
+
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.health.node.DiskHealthInfo;
+import org.elasticsearch.health.node.HealthInfoCache;
+import org.elasticsearch.health.node.LocalHealthMonitor;
+import org.elasticsearch.health.node.selection.HealthNode;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.InternalTestCluster;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 4)
+public class UpdateHealthInfoCacheIT extends ESIntegTestCase {
+
+    private static final DiskHealthInfo GREEN = new DiskHealthInfo(HealthStatus.GREEN, null);
+
+    public void testNodesReportingHealth() throws Exception {
+        try (InternalTestCluster internalCluster = internalCluster(); Client client = internalCluster.client()) {
+            decreasePollingInterval(client);
+            ClusterState state = internalCluster.client().admin().cluster().prepareState().clear().setNodes(true).get().getState();
+            String[] nodeIds = state.getNodes().getNodes().keySet().toArray(new String[0]);
+            DiscoveryNode healthNode = waitAndGetHealthNode(client);
+            assertThat(healthNode, notNullValue());
+            assertBusy(() -> {
+                Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, healthNode.getName())
+                    .getDiskHealthInfo();
+                assertThat(healthInfoCache.size(), equalTo(nodeIds.length));
+                for (String nodeId : nodeIds) {
+                    assertThat(healthInfoCache.get(nodeId), equalTo(GREEN));
+                }
+            });
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
+        }
+    }
+
+    public void testNodeLeavingCluster() throws Exception {
+        try (InternalTestCluster internalCluster = internalCluster(); Client client = internalCluster.client()) {
+            decreasePollingInterval(client);
+            ClusterState state = internalCluster.client().admin().cluster().prepareState().clear().setNodes(true).get().getState();
+            Collection<DiscoveryNode> nodes = state.getNodes().getNodes().values();
+            DiscoveryNode healthNode = waitAndGetHealthNode(client);
+            assertThat(healthNode, notNullValue());
+            DiscoveryNode nodeToLeave = nodes.stream().filter(node -> {
+                boolean isMaster = node.getName().equals(internalCluster.getMasterName());
+                boolean isHealthNode = node.getId().equals(healthNode.getId());
+                // We have dedicated tests for master and health node
+                return isMaster == false && isHealthNode == false;
+            }).findAny().orElseThrow();
+            internalCluster.stopNode(nodeToLeave.getName());
+            assertBusy(() -> {
+                Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, healthNode.getName())
+                    .getDiskHealthInfo();
+                assertThat(healthInfoCache.size(), equalTo(nodes.size() - 1));
+                for (DiscoveryNode node : nodes) {
+                    if (node.getId().equals(nodeToLeave.getId())) {
+                        assertThat(healthInfoCache.containsKey(node.getId()), equalTo(false));
+                    } else {
+                        assertThat(healthInfoCache.get(node.getId()), equalTo(GREEN));
+                    }
+                }
+            });
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
+        }
+    }
+
+    public void testHealthNodeFailOver() throws Exception {
+        try (InternalTestCluster internalCluster = internalCluster(); Client client = internalCluster.client()) {
+            decreasePollingInterval(client);
+            ClusterState state = internalCluster.client().admin().cluster().prepareState().clear().setNodes(true).get().getState();
+            String[] nodeIds = state.getNodes().getNodes().keySet().toArray(new String[0]);
+            DiscoveryNode healthNodeToBeShutDown = waitAndGetHealthNode(client);
+            assertThat(healthNodeToBeShutDown, notNullValue());
+            internalCluster.restartNode(healthNodeToBeShutDown.getName());
+            ensureStableCluster(nodeIds.length);
+            DiscoveryNode newHealthNode = waitAndGetHealthNode(client);
+            assertThat(newHealthNode, notNullValue());
+            logger.info("Previous health node {}, new health node {}.", healthNodeToBeShutDown, newHealthNode);
+            assertBusy(() -> {
+                Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, newHealthNode.getName())
+                    .getDiskHealthInfo();
+                assertThat(healthInfoCache.size(), equalTo(nodeIds.length));
+                for (String nodeId : nodeIds) {
+                    assertThat(healthInfoCache.get(nodeId), equalTo(GREEN));
+                }
+            });
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
+        }
+    }
+
+    public void testMasterFailure() throws Exception {
+        try (InternalTestCluster internalCluster = internalCluster(); Client client = internalCluster.client()) {
+            decreasePollingInterval(client);
+            ClusterState state = internalCluster.client().admin().cluster().prepareState().clear().setNodes(true).get().getState();
+            String[] nodeIds = state.getNodes().getNodes().keySet().toArray(new String[0]);
+            DiscoveryNode healthNodeBeforeIncident = waitAndGetHealthNode(client);
+            assertThat(healthNodeBeforeIncident, notNullValue());
+            String masterName = internalCluster.getMasterName();
+            logger.info("Restarting elected master node {}.", masterName);
+            internalCluster.restartNode(masterName);
+            ensureStableCluster(nodeIds.length);
+            DiscoveryNode newHealthNode = waitAndGetHealthNode(client);
+            assertThat(newHealthNode, notNullValue());
+            assertBusy(() -> {
+                Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, newHealthNode.getName())
+                    .getDiskHealthInfo();
+                assertThat(healthInfoCache.size(), equalTo(nodeIds.length));
+                for (String nodeId : nodeIds) {
+                    assertThat(healthInfoCache.get(nodeId), equalTo(GREEN));
+                }
+            });
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
+        }
+    }
+
+    @Nullable
+    private static DiscoveryNode waitAndGetHealthNode(Client client) throws InterruptedException {
+        DiscoveryNode[] healthNode = new DiscoveryNode[1];
+        waitUntil(() -> {
+            ClusterState state = client.admin().cluster().prepareState().clear().setMetadata(true).setNodes(true).get().getState();
+            healthNode[0] = HealthNode.findHealthNode(state);
+            return healthNode[0] != null;
+        }, 2, TimeUnit.SECONDS);
+        return healthNode[0];
+    }
+
+    private void decreasePollingInterval(Client client) {
+        client.admin()
+            .cluster()
+            .updateSettings(
+                new ClusterUpdateSettingsRequest().persistentSettings(
+                    Settings.builder().put(LocalHealthMonitor.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(10))
+                )
+            );
+    }
+}

+ 6 - 0
server/src/main/java/org/elasticsearch/action/ActionModule.java

@@ -263,6 +263,8 @@ import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
 import org.elasticsearch.health.GetHealthAction;
 import org.elasticsearch.health.RestGetHealthAction;
+import org.elasticsearch.health.node.UpdateHealthInfoCacheAction;
+import org.elasticsearch.health.node.selection.HealthNode;
 import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
 import org.elasticsearch.index.seqno.RetentionLeaseActions;
 import org.elasticsearch.indices.SystemIndices;
@@ -700,6 +702,10 @@ public class ActionModule extends AbstractModule {
         actions.register(UpdateDesiredNodesAction.INSTANCE, TransportUpdateDesiredNodesAction.class);
         actions.register(DeleteDesiredNodesAction.INSTANCE, TransportDeleteDesiredNodesAction.class);
 
+        if (HealthNode.isEnabled()) {
+            actions.register(UpdateHealthInfoCacheAction.INSTANCE, UpdateHealthInfoCacheAction.TransportAction.class);
+        }
+
         return unmodifiableMap(actions.getRegistry());
     }
 

+ 18 - 2
server/src/main/java/org/elasticsearch/health/node/DiskHealthInfo.java

@@ -8,17 +8,33 @@
 
 package org.elasticsearch.health.node;
 
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.health.HealthStatus;
 
+import java.io.IOException;
+
 /**
  * The health status of the disk space of this node along with the cause.
  */
-record DiskHealthInfo(HealthStatus healthStatus, Cause cause) {
+public record DiskHealthInfo(HealthStatus healthStatus, @Nullable Cause cause) implements Writeable {
     DiskHealthInfo(HealthStatus healthStatus) {
         this(healthStatus, null);
     }
 
-    enum Cause {
+    public DiskHealthInfo(StreamInput in) throws IOException {
+        this(in.readEnum(HealthStatus.class), in.readOptionalEnum(Cause.class));
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        healthStatus.writeTo(out);
+        out.writeOptionalEnum(cause);
+    }
+
+    public enum Cause {
         NODE_OVER_HIGH_THRESHOLD,
         NODE_OVER_THE_FLOOD_STAGE_THRESHOLD,
         FROZEN_NODE_OVER_FLOOD_STAGE_THRESHOLD,

+ 66 - 0
server/src/main/java/org/elasticsearch/health/node/HealthInfoCache.java

@@ -0,0 +1,66 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.health.node;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.health.node.selection.HealthNode;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Keeps track of several health statuses per node that can be used in health.
+ */
+public class HealthInfoCache implements ClusterStateListener {
+
+    private static final Logger logger = LogManager.getLogger(HealthInfoCache.class);
+    private volatile ConcurrentHashMap<String, DiskHealthInfo> diskInfoByNode = new ConcurrentHashMap<>();
+
+    private HealthInfoCache() {}
+
+    public static HealthInfoCache create(ClusterService clusterService) {
+        HealthInfoCache healthInfoCache = new HealthInfoCache();
+        clusterService.addListener(healthInfoCache);
+        return healthInfoCache;
+    }
+
+    public void updateNodeHealth(String nodeId, DiskHealthInfo diskHealthInfo) {
+        diskInfoByNode.put(nodeId, diskHealthInfo);
+    }
+
+    @Override
+    public void clusterChanged(ClusterChangedEvent event) {
+        DiscoveryNode currentHealthNode = HealthNode.findHealthNode(event.state());
+        DiscoveryNode localNode = event.state().nodes().getLocalNode();
+        if (currentHealthNode != null && localNode.getId().equals(currentHealthNode.getId())) {
+            if (event.nodesRemoved()) {
+                for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
+                    diskInfoByNode.remove(removedNode.getId());
+                }
+            }
+            // Resetting the cache is not synchronized for efficiency and simplicity.
+            // Processing a delayed update after the cache has been emptied because
+            // the node is not the health node anymore has small impact since it will
+            // be reset in the next round again.
+        } else if (diskInfoByNode.isEmpty() == false) {
+            logger.debug("Node [{}][{}] is no longer the health node, emptying the cache.", localNode.getName(), localNode.getId());
+            diskInfoByNode = new ConcurrentHashMap<>();
+        }
+    }
+
+    // A shallow copy is enough because the inner data is immutable.
+    public Map<String, DiskHealthInfo> getDiskHealthInfo() {
+        return Map.copyOf(diskInfoByNode);
+    }
+}

+ 297 - 48
server/src/main/java/org/elasticsearch/health/node/LocalHealthMonitor.java

@@ -11,8 +11,11 @@ package org.elasticsearch.health.node;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateListener;
@@ -24,19 +27,30 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.common.util.concurrent.RunOnce;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.health.HealthStatus;
 import org.elasticsearch.health.metadata.HealthMetadata;
+import org.elasticsearch.health.node.action.HealthNodeNotDiscoveredException;
+import org.elasticsearch.health.node.selection.HealthNode;
 import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
 import org.elasticsearch.node.NodeService;
+import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.NodeNotConnectedException;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.core.Strings.format;
 
 /**
  * This class monitors the health of the node regarding the load on several resources.
  * Currently, it only checks for available disk space. Furthermore, it informs the health
- * node about the local health upon change or when a new node is detected.
+ * node about the local health upon change or when a new node is detected or when the
+ * master node changed.
  */
 public class LocalHealthMonitor implements ClusterStateListener {
 
@@ -53,23 +67,35 @@ public class LocalHealthMonitor implements ClusterStateListener {
     private final ClusterService clusterService;
     private final ThreadPool threadPool;
     private final DiskCheck diskCheck;
+    private final Client client;
 
     private volatile TimeValue monitorInterval;
     private volatile boolean enabled;
-    // Signals that all the prerequisites have been fulfilled and the monitoring task can be scheduled.
+
+    // Signals that all the prerequisites have been fulfilled and the monitoring can be started.
     private volatile boolean prerequisitesFulfilled;
-    // Ensures that only one monitoring task will be in progress at any moment in time.
-    // It removes the need to synchronize scheduling since at the event that there are two
-    // monitoring tasks scheduled, one of them will be no-op.
-    private final AtomicBoolean inProgress = new AtomicBoolean();
-    // Keeps the latest health state that was successfully reported.
-    private volatile DiskHealthInfo lastReportedDiskHealthInfo = null;
-
-    private LocalHealthMonitor(Settings settings, ClusterService clusterService, NodeService nodeService, ThreadPool threadPool) {
+
+    // Keeps the latest health state that was successfully reported to the current health node.
+    private final AtomicReference<DiskHealthInfo> lastReportedDiskHealthInfo = new AtomicReference<>();
+    // Keeps the last seen health node. We use this variable to ensure that there wasn't a health node
+    // change between the time we send an update until the time we update the lastReportedDiskHealthInfo.
+    private final AtomicReference<String> lastSeenHealthNode = new AtomicReference<>();
+    // Using a volatile reference to ensure that there is a single instance of monitoring running at all times.
+    // No need for extra synchronization because all the writes are executed on the cluster applier thread.
+    private volatile Monitoring monitoring;
+
+    private LocalHealthMonitor(
+        Settings settings,
+        ClusterService clusterService,
+        NodeService nodeService,
+        ThreadPool threadPool,
+        Client client
+    ) {
         this.threadPool = threadPool;
         this.monitorInterval = POLL_INTERVAL_SETTING.get(settings);
         this.enabled = HealthNodeTaskExecutor.ENABLED_SETTING.get(settings);
         this.clusterService = clusterService;
+        this.client = client;
         this.diskCheck = new DiskCheck(nodeService);
     }
 
@@ -77,9 +103,10 @@ public class LocalHealthMonitor implements ClusterStateListener {
         Settings settings,
         ClusterService clusterService,
         NodeService nodeService,
-        ThreadPool threadPool
+        ThreadPool threadPool,
+        Client client
     ) {
-        LocalHealthMonitor localHealthMonitor = new LocalHealthMonitor(settings, clusterService, nodeService, threadPool);
+        LocalHealthMonitor localHealthMonitor = new LocalHealthMonitor(settings, clusterService, nodeService, threadPool, client);
         localHealthMonitor.registerListeners();
         return localHealthMonitor;
     }
@@ -91,63 +118,283 @@ public class LocalHealthMonitor implements ClusterStateListener {
         clusterService.addListener(this);
     }
 
+    // When the monitoring interval changes, we restart the health monitoring with the new interval.
     void setMonitorInterval(TimeValue monitorInterval) {
         this.monitorInterval = monitorInterval;
-        maybeScheduleNow();
+        stopMonitoring();
+        startMonitoringIfNecessary();
     }
 
+    // When the health node is enabled we try to start monitoring if it is not
+    // already running, no need to restart it since there was no configuration
+    // change. When the health node is disabled we stop monitoring.
     void setEnabled(boolean enabled) {
         this.enabled = enabled;
-        maybeScheduleNow();
+        if (enabled) {
+            startMonitoringIfNecessary();
+        } else {
+            stopMonitoring();
+        }
     }
 
-    /**
-     * We always check if the prerequisites are fulfilled and if the health node
-     * is enabled before we schedule a monitoring task.
-     */
-    private void maybeScheduleNextRun(TimeValue time) {
+    private void stopMonitoring() {
+        // If there is an existing schedule, cancel it
+        Scheduler.Cancellable currentMonitoring = monitoring;
+        if (currentMonitoring != null) {
+            currentMonitoring.cancel();
+        }
+    }
+
+    private void startMonitoringIfNecessary() {
         if (prerequisitesFulfilled && enabled) {
-            threadPool.scheduleUnlessShuttingDown(time, ThreadPool.Names.MANAGEMENT, this::monitorHealth);
+            if (isMonitorRunning() == false) {
+                monitoring = Monitoring.start(
+                    monitorInterval,
+                    threadPool,
+                    lastReportedDiskHealthInfo,
+                    lastSeenHealthNode,
+                    diskCheck,
+                    clusterService,
+                    client
+                );
+                logger.debug("Local health monitoring started {}", monitoring);
+            } else {
+                logger.debug("Local health monitoring already started {}, skipping", monitoring);
+            }
         }
     }
 
-    // Helper method that starts the monitoring without a delay.
-    private void maybeScheduleNow() {
-        maybeScheduleNextRun(TimeValue.ZERO);
+    private boolean isMonitorRunning() {
+        Scheduler.Cancellable scheduled = this.monitoring;
+        return scheduled != null && scheduled.isCancelled() == false;
     }
 
     @Override
     public void clusterChanged(ClusterChangedEvent event) {
-        if (prerequisitesFulfilled == false) {
-            prerequisitesFulfilled = event.state().nodesIfRecovered().getMinNodeVersion().onOrAfter(Version.V_8_5_0)
-                && HealthMetadata.getFromClusterState(event.state()) != null;
-            maybeScheduleNow();
+        DiscoveryNode currentHealthNode = HealthNode.findHealthNode(event.state());
+        DiscoveryNode currentMasterNode = event.state().nodes().getMasterNode();
+        boolean healthNodeChanged = hasHealthNodeChanged(currentHealthNode, event);
+        boolean masterNodeChanged = hasMasterNodeChanged(currentMasterNode, event);
+        if (healthNodeChanged || masterNodeChanged) {
+            // On health node or on master node changes, the health node might be reset so the reported
+            // health info gets reset to null, to ensure it will be resent.
+            lastSeenHealthNode.set(currentHealthNode == null ? null : currentHealthNode.getId());
+            lastReportedDiskHealthInfo.set(null);
+            if (logger.isDebugEnabled()) {
+                String reason;
+                if (healthNodeChanged && masterNodeChanged) {
+                    reason = "the master node and the health node";
+                } else if (healthNodeChanged) {
+                    reason = "the health node";
+                } else {
+                    reason = "the master node";
+                }
+                logger.debug(
+                    "Resetting the health monitoring because {} changed, current health node is {}.",
+                    reason,
+                    currentHealthNode == null ? null : format("[%s][%s]", currentHealthNode.getName(), currentHealthNode.getId())
+                );
+            }
+        }
+        prerequisitesFulfilled = event.state().nodesIfRecovered().getMinNodeVersion().onOrAfter(Version.V_8_5_0)
+            && HealthMetadata.getFromClusterState(event.state()) != null
+            && currentHealthNode != null
+            && currentMasterNode != null;
+        if (prerequisitesFulfilled == false || healthNodeChanged || masterNodeChanged) {
+            stopMonitoring();
+        }
+        if (prerequisitesFulfilled) {
+            startMonitoringIfNecessary();
         }
     }
 
-    // Visible for testing
-    void monitorHealth() {
-        if (inProgress.compareAndSet(false, true)) {
-            ClusterState clusterState = clusterService.state();
-            HealthMetadata healthMetadata = HealthMetadata.getFromClusterState(clusterState);
-            assert healthMetadata != null : "health metadata should have been initialized.";
-            DiskHealthInfo previousHealth = this.lastReportedDiskHealthInfo;
-            DiskHealthInfo currentHealth = diskCheck.getHealth(healthMetadata, clusterState);
-            if (currentHealth.equals(previousHealth) == false) {
-                logger.debug("Health status changed from {} to {}", previousHealth, currentHealth);
-                this.lastReportedDiskHealthInfo = currentHealth;
-            }
-            inProgress.set(false);
-            // Scheduling happens after the flag inProgress is false, this ensures that
-            // if the feature is enabled after the following schedule statement, the setEnabled
-            // method will be able to schedule the next run, and it will not be a no-op.
-            // We prefer to err towards an extra scheduling than miss the enabling of this feature alltogether.
-            maybeScheduleNextRun(monitorInterval);
+    private boolean hasMasterNodeChanged(DiscoveryNode currentMasterNode, ClusterChangedEvent event) {
+        DiscoveryNode previousMasterNode = event.previousState().nodes().getMasterNode();
+        if (currentMasterNode == null || previousMasterNode == null) {
+            return currentMasterNode != previousMasterNode;
         }
+        return previousMasterNode.getEphemeralId().equals(currentMasterNode.getEphemeralId()) == false;
     }
 
+    // We compare the current health node against both the last seen health node from this node and the
+    // health node reported in the previous cluster state to be safe that we do not miss any change due to
+    // a flaky state.
+    private boolean hasHealthNodeChanged(DiscoveryNode currentHealthNode, ClusterChangedEvent event) {
+        DiscoveryNode previousHealthNode = HealthNode.findHealthNode(event.previousState());
+        return Objects.equals(lastSeenHealthNode.get(), currentHealthNode == null ? null : currentHealthNode.getId()) == false
+            || Objects.equals(previousHealthNode, currentHealthNode) == false;
+    }
+
+    @Nullable
     DiskHealthInfo getLastReportedDiskHealthInfo() {
-        return lastReportedDiskHealthInfo;
+        return lastReportedDiskHealthInfo.get();
+    }
+
+    /**
+     * This class is responsible for running the health monitoring. It evaluates and checks the health info of this node
+     * in the configured intervals. The first run happens upon initialization. If there is an exception, it will log it
+     * and continue to schedule the next run.
+     */
+    static class Monitoring implements Runnable, Scheduler.Cancellable {
+
+        private final TimeValue interval;
+        private final String executor;
+        private final Scheduler scheduler;
+        private final ClusterService clusterService;
+        private final DiskCheck diskCheck;
+        private final Client client;
+
+        private final AtomicReference<DiskHealthInfo> lastReportedDiskHealthInfo;
+        private final AtomicReference<String> lastSeenHealthNode;
+
+        private volatile boolean cancelled = false;
+        private volatile Scheduler.ScheduledCancellable scheduledRun;
+
+        private Monitoring(
+            TimeValue interval,
+            Scheduler scheduler,
+            String executor,
+            AtomicReference<DiskHealthInfo> lastReportedDiskHealthInfo,
+            AtomicReference<String> lastSeenHealthNode,
+            DiskCheck diskCheck,
+            ClusterService clusterService,
+            Client client
+        ) {
+            this.interval = interval;
+            this.executor = executor;
+            this.scheduler = scheduler;
+            this.lastReportedDiskHealthInfo = lastReportedDiskHealthInfo;
+            this.lastSeenHealthNode = lastSeenHealthNode;
+            this.clusterService = clusterService;
+            this.diskCheck = diskCheck;
+            this.client = client;
+        }
+
+        /**
+         * Creates a monitoring instance and starts the schedules the first run.
+         */
+        static Monitoring start(
+            TimeValue interval,
+            Scheduler scheduler,
+            AtomicReference<DiskHealthInfo> lastReportedDiskHealthInfo,
+            AtomicReference<String> lastSeenHealthNode,
+            DiskCheck diskCheck,
+            ClusterService clusterService,
+            Client client
+        ) {
+            Monitoring monitoring = new Monitoring(
+                interval,
+                scheduler,
+                ThreadPool.Names.MANAGEMENT,
+                lastReportedDiskHealthInfo,
+                lastSeenHealthNode,
+                diskCheck,
+                clusterService,
+                client
+            );
+            monitoring.scheduledRun = scheduler.schedule(monitoring, TimeValue.ZERO, monitoring.executor);
+            return monitoring;
+        }
+
+        /**
+         * Attempts to cancel monitoring. This method has no effect if
+         * the monitoring is already cancelled. If the {@code scheduledRun}
+         * has not started when {@code cancel} is called, this run should
+         * never run. If the {@code scheduledRun} is already running, then
+         * it will not be interrupted but the next run will not be scheduled.
+         *
+         * @return false, if the {@code scheduledRun} was already cancelled; true
+         * otherwise.
+         */
+        @Override
+        public boolean cancel() {
+            if (cancelled) {
+                // already cancelled
+                return false;
+            }
+            cancelled = true;
+            scheduledRun.cancel();
+            return true;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+        /**
+         * This method evaluates the health info of this node and if there is a change it sends an update request to the health node.
+         */
+        @Override
+        public void run() {
+            if (cancelled) {
+                return;
+            }
+            boolean nextRunScheduled = false;
+            Runnable scheduleNextRun = new RunOnce(this::scheduleNextRunIfNecessary);
+            try {
+                ClusterState clusterState = clusterService.state();
+                HealthMetadata healthMetadata = HealthMetadata.getFromClusterState(clusterState);
+                if (healthMetadata != null) {
+                    DiskHealthInfo previousHealth = this.lastReportedDiskHealthInfo.get();
+                    DiskHealthInfo currentHealth = diskCheck.getHealth(healthMetadata, clusterState);
+                    if (currentHealth.equals(previousHealth) == false) {
+                        String nodeId = clusterService.localNode().getId();
+                        String healthNodeId = lastSeenHealthNode.get();
+                        ActionListener<AcknowledgedResponse> listener = ActionListener.wrap(response -> {
+                            // Update the last reported value only if the health node hasn't changed.
+                            if (Objects.equals(healthNodeId, lastSeenHealthNode.get())
+                                && lastReportedDiskHealthInfo.compareAndSet(previousHealth, currentHealth)) {
+                                logger.debug(
+                                    "Health info [{}] successfully sent, last reported value: {}.",
+                                    currentHealth,
+                                    lastReportedDiskHealthInfo.get()
+                                );
+                            }
+                        }, e -> {
+                            if (e.getCause() instanceof NodeNotConnectedException
+                                || e.getCause() instanceof HealthNodeNotDiscoveredException) {
+                                logger.debug("Failed to connect to the health node [{}], will try again.", e.getCause().getMessage());
+                            } else {
+                                logger.debug(
+                                    () -> format("Failed to send health info [%s] to health node, will try again.", currentHealth),
+                                    e
+                                );
+                            }
+                        });
+                        client.execute(
+                            UpdateHealthInfoCacheAction.INSTANCE,
+                            new UpdateHealthInfoCacheAction.Request(nodeId, currentHealth),
+                            ActionListener.runAfter(listener, scheduleNextRun)
+                        );
+                        nextRunScheduled = true;
+                    }
+                }
+            } catch (Exception e) {
+                logger.warn(() -> format("Failed to run scheduled health monitoring on thread pool [%s]", executor), e);
+            } finally {
+                // If the next run isn't scheduled because for example the health info hasn't changed, we schedule it here.
+                if (nextRunScheduled == false) {
+                    scheduleNextRun.run();
+                }
+            }
+        }
+
+        private void scheduleNextRunIfNecessary() {
+            if (cancelled) {
+                return;
+            }
+            try {
+                scheduledRun = scheduler.schedule(this, interval, executor);
+            } catch (final EsRejectedExecutionException e) {
+                logger.debug(() -> format("Scheduled health monitoring was rejected on thread pool [%s]", executor), e);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "Monitoring{interval=" + interval + ", cancelled=" + cancelled + "}";
+        }
     }
 
     /**
@@ -173,7 +420,7 @@ public class LocalHealthMonitor implements ClusterStateListener {
             if (node.isDedicatedFrozenNode()) {
                 long frozenFloodStageThreshold = diskMetadata.getFreeBytesFrozenFloodStageWatermark(totalBytes).getBytes();
                 if (usage.getFreeBytes() < frozenFloodStageThreshold) {
-                    logger.debug("flood stage disk watermark [{}] exceeded on {}", frozenFloodStageThreshold, usage);
+                    logger.debug("Flood stage disk watermark [{}] exceeded on {}", frozenFloodStageThreshold, usage);
                     return new DiskHealthInfo(HealthStatus.RED, DiskHealthInfo.Cause.FROZEN_NODE_OVER_FLOOD_STAGE_THRESHOLD);
                 }
                 return new DiskHealthInfo(HealthStatus.GREEN);
@@ -181,11 +428,13 @@ public class LocalHealthMonitor implements ClusterStateListener {
 
             long floodStageThreshold = diskMetadata.getFreeBytesFloodStageWatermark(totalBytes).getBytes();
             if (usage.getFreeBytes() < floodStageThreshold) {
+                logger.debug("Flood stage disk watermark [{}] exceeded on {}", floodStageThreshold, usage);
                 return new DiskHealthInfo(HealthStatus.RED, DiskHealthInfo.Cause.NODE_OVER_THE_FLOOD_STAGE_THRESHOLD);
             }
 
             long highThreshold = diskMetadata.getFreeBytesHighWatermark(totalBytes).getBytes();
             if (usage.getFreeBytes() < highThreshold && hasRelocatingShards(clusterState, node.getId()) == false) {
+                logger.debug("High disk watermark [{}] exceeded on {}", highThreshold, usage);
                 return new DiskHealthInfo(HealthStatus.YELLOW, DiskHealthInfo.Cause.NODE_OVER_HIGH_THRESHOLD);
             }
             return new DiskHealthInfo(HealthStatus.GREEN);

+ 128 - 0
server/src/main/java/org/elasticsearch/health/node/UpdateHealthInfoCacheAction.java

@@ -0,0 +1,128 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.health.node;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.health.node.action.TransportHealthNodeAction;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * This action allows a node to send their health info to the selected health node.
+ * When the health node receives the health info, it will update the internal cache
+ * regarding this node.
+ */
+public class UpdateHealthInfoCacheAction extends ActionType<AcknowledgedResponse> {
+
+    public static class Request extends ActionRequest {
+        private final String nodeId;
+        private final DiskHealthInfo diskHealthInfo;
+
+        public Request(String nodeId, DiskHealthInfo diskHealthInfo) {
+            this.nodeId = nodeId;
+            this.diskHealthInfo = diskHealthInfo;
+        }
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            this.nodeId = in.readString();
+            this.diskHealthInfo = new DiskHealthInfo(in);
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public DiskHealthInfo getDiskHealthInfo() {
+            return diskHealthInfo;
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeString(nodeId);
+            diskHealthInfo.writeTo(out);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Request request = (Request) o;
+            return Objects.equals(nodeId, request.nodeId) && Objects.equals(diskHealthInfo, request.diskHealthInfo);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(nodeId, diskHealthInfo);
+        }
+    }
+
+    public static final UpdateHealthInfoCacheAction INSTANCE = new UpdateHealthInfoCacheAction();
+    public static final String NAME = "cluster:monitor/update/health/info";
+
+    private UpdateHealthInfoCacheAction() {
+        super(NAME, AcknowledgedResponse::readFrom);
+    }
+
+    public static class TransportAction extends TransportHealthNodeAction<Request, AcknowledgedResponse> {
+        private final HealthInfoCache nodeHealthOverview;
+
+        @Inject
+        public TransportAction(
+            TransportService transportService,
+            ClusterService clusterService,
+            ThreadPool threadPool,
+            ActionFilters actionFilters,
+            HealthInfoCache nodeHealthOverview
+        ) {
+            super(
+                UpdateHealthInfoCacheAction.NAME,
+                transportService,
+                clusterService,
+                threadPool,
+                actionFilters,
+                UpdateHealthInfoCacheAction.Request::new,
+                AcknowledgedResponse::readFrom,
+                ThreadPool.Names.MANAGEMENT
+            );
+            this.nodeHealthOverview = nodeHealthOverview;
+        }
+
+        @Override
+        protected void healthOperation(
+            Task task,
+            Request request,
+            ClusterState clusterState,
+            ActionListener<AcknowledgedResponse> listener
+        ) {
+            nodeHealthOverview.updateNodeHealth(request.getNodeId(), request.getDiskHealthInfo());
+            listener.onResponse(AcknowledgedResponse.of(true));
+        }
+    }
+}

+ 2 - 2
server/src/main/java/org/elasticsearch/health/node/action/HealthNodeNotDiscoveredException.java

@@ -20,8 +20,8 @@ import java.io.IOException;
  */
 public class HealthNodeNotDiscoveredException extends ElasticsearchException {
 
-    public HealthNodeNotDiscoveredException(String message) {
-        super(message);
+    public HealthNodeNotDiscoveredException() {
+        super("No health node was discovered");
     }
 
     public HealthNodeNotDiscoveredException(StreamInput in) throws IOException {

+ 1 - 1
server/src/main/java/org/elasticsearch/health/node/action/TransportHealthNodeAction.java

@@ -80,7 +80,7 @@ public abstract class TransportHealthNodeAction<Request extends ActionRequest, R
             DiscoveryNode healthNode = HealthNode.findHealthNode(clusterState);
             DiscoveryNode localNode = clusterState.nodes().getLocalNode();
             if (healthNode == null) {
-                listener.onFailure(new HealthNodeNotDiscoveredException("Health node was null"));
+                listener.onFailure(new HealthNodeNotDiscoveredException());
             } else if (localNode.getId().equals(healthNode.getId())) {
                 threadPool.executor(executor).execute(() -> {
                     try {

+ 1 - 1
server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java

@@ -108,7 +108,7 @@ public final class HealthNodeTaskExecutor extends PersistentTasksExecutor<Health
         HealthNode healthNode = (HealthNode) task;
         currentTask.set(healthNode);
         DiscoveryNode node = clusterService.localNode();
-        logger.info("Node [{{}{}}] is selected as the current health node.", node.getName(), node.getId());
+        logger.info("Node [{{}}{{}}] is selected as the current health node.", node.getName(), node.getId());
     }
 
     @Override

+ 4 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -104,6 +104,7 @@ import org.elasticsearch.gateway.PersistedClusterStateService;
 import org.elasticsearch.health.HealthIndicatorService;
 import org.elasticsearch.health.HealthService;
 import org.elasticsearch.health.metadata.HealthMetadataService;
+import org.elasticsearch.health.node.HealthInfoCache;
 import org.elasticsearch.health.node.LocalHealthMonitor;
 import org.elasticsearch.health.node.selection.HealthNode;
 import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
@@ -960,8 +961,9 @@ public class Node implements Closeable {
                 ? HealthMetadataService.create(clusterService, settings)
                 : null;
             LocalHealthMonitor localHealthMonitor = HealthNode.isEnabled()
-                ? LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool)
+                ? LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client)
                 : null;
+            HealthInfoCache nodeHealthOverview = HealthNode.isEnabled() ? HealthInfoCache.create(clusterService) : null;
 
             modules.add(b -> {
                 b.bind(Node.class).toInstance(this);
@@ -1050,6 +1052,7 @@ public class Node implements Closeable {
                     b.bind(HealthNodeTaskExecutor.class).toInstance(healthNodeTaskExecutor);
                     b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
                     b.bind(LocalHealthMonitor.class).toInstance(localHealthMonitor);
+                    b.bind(HealthInfoCache.class).toInstance(nodeHealthOverview);
                 }
                 b.bind(Tracer.class).toInstance(tracer);
                 b.bind(FileSettingsService.class).toInstance(fileSettingsService);

+ 91 - 0
server/src/test/java/org/elasticsearch/health/node/HealthInfoCacheTests.java

@@ -0,0 +1,91 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.health.node;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.health.HealthStatus;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Mockito.mock;
+
+public class HealthInfoCacheTests extends ESTestCase {
+
+    private static final DiskHealthInfo GREEN = new DiskHealthInfo(HealthStatus.GREEN, null);
+    private static final DiskHealthInfo RED = new DiskHealthInfo(
+        HealthStatus.RED,
+        DiskHealthInfo.Cause.FROZEN_NODE_OVER_FLOOD_STAGE_THRESHOLD
+    );
+    private final ClusterService clusterService = mock(ClusterService.class);
+    private final DiscoveryNode node1 = new DiscoveryNode(
+        "node_1",
+        buildNewFakeTransportAddress(),
+        Collections.emptyMap(),
+        Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE),
+        Version.CURRENT
+    );
+    private final DiscoveryNode node2 = new DiscoveryNode(
+        "node_2",
+        buildNewFakeTransportAddress(),
+        Collections.emptyMap(),
+        Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE),
+        Version.CURRENT
+    );
+    private final DiscoveryNode[] allNodes = new DiscoveryNode[] { node1, node2 };
+
+    public void testAddHealthInfo() {
+        HealthInfoCache healthInfoCache = HealthInfoCache.create(clusterService);
+        healthInfoCache.updateNodeHealth(node1.getId(), GREEN);
+        healthInfoCache.updateNodeHealth(node2.getId(), RED);
+
+        Map<String, DiskHealthInfo> diskHealthInfo = healthInfoCache.getDiskHealthInfo();
+        healthInfoCache.updateNodeHealth(node1.getId(), RED);
+
+        assertThat(diskHealthInfo.get(node1.getId()), equalTo(GREEN));
+        assertThat(diskHealthInfo.get(node2.getId()), equalTo(RED));
+    }
+
+    public void testRemoveNodeFromTheCluster() {
+        HealthInfoCache healthInfoCache = HealthInfoCache.create(clusterService);
+        healthInfoCache.updateNodeHealth(node1.getId(), GREEN);
+        healthInfoCache.updateNodeHealth(node2.getId(), RED);
+
+        ClusterState previous = ClusterStateCreationUtils.state(node1, node1, node1, allNodes);
+        ClusterState current = ClusterStateCreationUtils.state(node1, node1, node1, new DiscoveryNode[] { node1 });
+        healthInfoCache.clusterChanged(new ClusterChangedEvent("test", current, previous));
+
+        Map<String, DiskHealthInfo> diskHealthInfo = healthInfoCache.getDiskHealthInfo();
+        assertThat(diskHealthInfo.get(node1.getId()), equalTo(GREEN));
+        assertThat(diskHealthInfo.get(node2.getId()), nullValue());
+    }
+
+    public void testNotAHealthNode() {
+        HealthInfoCache healthInfoCache = HealthInfoCache.create(clusterService);
+        healthInfoCache.updateNodeHealth(node1.getId(), GREEN);
+        healthInfoCache.updateNodeHealth(node2.getId(), RED);
+
+        ClusterState previous = ClusterStateCreationUtils.state(node1, node1, node1, allNodes);
+        ClusterState current = ClusterStateCreationUtils.state(node1, node1, node2, allNodes);
+        healthInfoCache.clusterChanged(new ClusterChangedEvent("test", current, previous));
+
+        Map<String, DiskHealthInfo> diskHealthInfo = healthInfoCache.getDiskHealthInfo();
+        assertThat(diskHealthInfo.isEmpty(), equalTo(true));
+    }
+}

+ 114 - 20
server/src/test/java/org/elasticsearch/health/node/LocalHealthMonitorTests.java

@@ -9,8 +9,12 @@
 package org.elasticsearch.health.node;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
+import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -36,18 +40,20 @@ import org.junit.BeforeClass;
 
 import java.util.Collections;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class LocalHealthMonitorTests extends ESTestCase {
 
+    private static final DiskHealthInfo GREEN = new DiskHealthInfo(HealthStatus.GREEN, null);
     private static ThreadPool threadPool;
     private NodeService nodeService;
     private ClusterService clusterService;
@@ -55,6 +61,7 @@ public class LocalHealthMonitorTests extends ESTestCase {
     private DiscoveryNode frozenNode;
     private HealthMetadata healthMetadata;
     private ClusterState clusterState;
+    private Client client;
 
     @BeforeClass
     public static void setUpThreadPool() {
@@ -94,9 +101,8 @@ public class LocalHealthMonitorTests extends ESTestCase {
             Set.of(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE),
             Version.CURRENT
         );
-        clusterState = ClusterState.EMPTY_STATE.copyAndUpdate(
-            b -> b.nodes(DiscoveryNodes.builder().add(node).add(frozenNode).localNodeId(node.getId()).build())
-        ).copyAndUpdate(b -> b.putCustom(HealthMetadata.TYPE, healthMetadata));
+        clusterState = ClusterStateCreationUtils.state(node, node, node, new DiscoveryNode[] { node, frozenNode })
+            .copyAndUpdate(b -> b.putCustom(HealthMetadata.TYPE, healthMetadata));
 
         // Set-up cluster service
         clusterService = mock(ClusterService.class);
@@ -104,48 +110,136 @@ public class LocalHealthMonitorTests extends ESTestCase {
             new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
         );
         when(clusterService.state()).thenReturn(clusterState);
+        when(clusterService.localNode()).thenReturn(node);
 
         // Set-up node service with a node with a healthy disk space usage
         nodeService = mock(NodeService.class);
+
+        client = mock(Client.class);
     }
 
-    public void testUpdateNodeHealthStatus() {
+    @SuppressWarnings("unchecked")
+    public void testUpdateHealthInfo() throws Exception {
+        doAnswer(invocation -> {
+            DiskHealthInfo diskHealthInfo = ((UpdateHealthInfoCacheAction.Request) invocation.getArgument(1)).getDiskHealthInfo();
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocation.getArguments()[2];
+            assertThat(diskHealthInfo, equalTo(GREEN));
+            listener.onResponse(null);
+            return null;
+        }).when(client).execute(any(), any(), any());
         simulateHealthDiskSpace();
-        LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(Settings.EMPTY, clusterService, nodeService, threadPool);
+        LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(Settings.EMPTY, clusterService, nodeService, threadPool, client);
+        // We override the poll interval like this to avoid the min value set by the setting which is too high for this test
+        localHealthMonitor.setMonitorInterval(TimeValue.timeValueMillis(10));
         assertThat(localHealthMonitor.getLastReportedDiskHealthInfo(), nullValue());
-        localHealthMonitor.monitorHealth();
-        assertThat(localHealthMonitor.getLastReportedDiskHealthInfo(), equalTo(new DiskHealthInfo(HealthStatus.GREEN, null)));
+        localHealthMonitor.clusterChanged(new ClusterChangedEvent("initialize", clusterState, ClusterState.EMPTY_STATE));
+        assertBusy(() -> assertThat(localHealthMonitor.getLastReportedDiskHealthInfo(), equalTo(GREEN)));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testDoNotUpdateHealthInfoOnFailure() throws Exception {
+        AtomicReference<Boolean> clientCalled = new AtomicReference<>(false);
+        doAnswer(invocation -> {
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocation.getArguments()[2];
+            listener.onFailure(new RuntimeException("simulated"));
+            clientCalled.set(true);
+            return null;
+        }).when(client).execute(any(), any(), any());
+
+        simulateHealthDiskSpace();
+        LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(Settings.EMPTY, clusterService, nodeService, threadPool, client);
+        localHealthMonitor.clusterChanged(new ClusterChangedEvent("initialize", clusterState, ClusterState.EMPTY_STATE));
+        assertBusy(() -> assertThat(clientCalled.get(), equalTo(true)));
+        assertThat(localHealthMonitor.getLastReportedDiskHealthInfo(), nullValue());
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testSendHealthInfoToNewNode() throws Exception {
+        ClusterState previous = ClusterStateCreationUtils.state(node, node, frozenNode, new DiscoveryNode[] { node, frozenNode })
+            .copyAndUpdate(b -> b.putCustom(HealthMetadata.TYPE, healthMetadata));
+        ClusterState current = ClusterStateCreationUtils.state(node, node, node, new DiscoveryNode[] { node, frozenNode })
+            .copyAndUpdate(b -> b.putCustom(HealthMetadata.TYPE, healthMetadata));
+        simulateHealthDiskSpace();
+
+        AtomicInteger counter = new AtomicInteger(0);
+        doAnswer(invocation -> {
+            DiskHealthInfo diskHealthInfo = ((UpdateHealthInfoCacheAction.Request) invocation.getArgument(1)).getDiskHealthInfo();
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocation.getArguments()[2];
+            assertThat(diskHealthInfo, equalTo(GREEN));
+            counter.incrementAndGet();
+            listener.onResponse(null);
+            return null;
+        }).when(client).execute(any(), any(), any());
+
+        when(clusterService.state()).thenReturn(previous);
+        LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(Settings.EMPTY, clusterService, nodeService, threadPool, client);
+        localHealthMonitor.clusterChanged(new ClusterChangedEvent("start-up", previous, ClusterState.EMPTY_STATE));
+        assertBusy(() -> assertThat(localHealthMonitor.getLastReportedDiskHealthInfo(), equalTo(GREEN)));
+        localHealthMonitor.clusterChanged(new ClusterChangedEvent("health-node-switch", current, previous));
+        assertBusy(() -> assertThat(counter.get(), equalTo(2)));
     }
 
+    @SuppressWarnings("unchecked")
+    public void testResendHealthInfoOnMasterChange() throws Exception {
+        ClusterState previous = ClusterStateCreationUtils.state(node, node, node, new DiscoveryNode[] { node, frozenNode })
+            .copyAndUpdate(b -> b.putCustom(HealthMetadata.TYPE, healthMetadata));
+        ClusterState current = ClusterStateCreationUtils.state(node, frozenNode, node, new DiscoveryNode[] { node, frozenNode })
+            .copyAndUpdate(b -> b.putCustom(HealthMetadata.TYPE, healthMetadata));
+        simulateHealthDiskSpace();
+
+        AtomicInteger counter = new AtomicInteger(0);
+        doAnswer(invocation -> {
+            DiskHealthInfo diskHealthInfo = ((UpdateHealthInfoCacheAction.Request) invocation.getArgument(1)).getDiskHealthInfo();
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocation.getArguments()[2];
+            assertThat(diskHealthInfo, equalTo(GREEN));
+            counter.incrementAndGet();
+            listener.onResponse(null);
+            return null;
+        }).when(client).execute(any(), any(), any());
+
+        when(clusterService.state()).thenReturn(previous);
+        LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(Settings.EMPTY, clusterService, nodeService, threadPool, client);
+        localHealthMonitor.clusterChanged(new ClusterChangedEvent("start-up", previous, ClusterState.EMPTY_STATE));
+        assertBusy(() -> assertThat(localHealthMonitor.getLastReportedDiskHealthInfo(), equalTo(GREEN)));
+        localHealthMonitor.clusterChanged(new ClusterChangedEvent("health-node-switch", current, previous));
+        assertBusy(() -> assertThat(counter.get(), equalTo(2)));
+    }
+
+    @SuppressWarnings("unchecked")
     public void testEnablingAndDisabling() throws Exception {
+        AtomicInteger clientCalledCount = new AtomicInteger();
+        doAnswer(invocation -> {
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocation.getArguments()[2];
+            listener.onResponse(null);
+            clientCalledCount.incrementAndGet();
+            return null;
+        }).when(client).execute(any(), any(), any());
         simulateHealthDiskSpace();
-        DiskHealthInfo healthyNode = new DiskHealthInfo(HealthStatus.GREEN, null);
         when(clusterService.state()).thenReturn(null);
-        LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(Settings.EMPTY, clusterService, nodeService, threadPool);
+        LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(Settings.EMPTY, clusterService, nodeService, threadPool, client);
 
         // Ensure that there are no issues if the cluster state hasn't been initialized yet
         localHealthMonitor.setEnabled(true);
         assertThat(localHealthMonitor.getLastReportedDiskHealthInfo(), nullValue());
+        assertThat(clientCalledCount.get(), equalTo(0));
 
         when(clusterService.state()).thenReturn(clusterState);
         localHealthMonitor.clusterChanged(new ClusterChangedEvent("test", clusterState, ClusterState.EMPTY_STATE));
-        assertBusy(() -> assertThat(localHealthMonitor.getLastReportedDiskHealthInfo(), equalTo(healthyNode)));
+        assertBusy(() -> assertThat(localHealthMonitor.getLastReportedDiskHealthInfo(), equalTo(GREEN)));
+        assertThat(clientCalledCount.get(), equalTo(1));
 
         // Disable the local monitoring
         localHealthMonitor.setEnabled(false);
         localHealthMonitor.setMonitorInterval(TimeValue.timeValueMillis(1));
         simulateDiskOutOfSpace();
-        assertRemainsUnchanged(localHealthMonitor::getLastReportedDiskHealthInfo, healthyNode);
+        assertThat(clientCalledCount.get(), equalTo(1));
+        localHealthMonitor.setMonitorInterval(TimeValue.timeValueSeconds(30));
 
         localHealthMonitor.setEnabled(true);
         DiskHealthInfo nextHealthStatus = new DiskHealthInfo(HealthStatus.RED, DiskHealthInfo.Cause.NODE_OVER_THE_FLOOD_STAGE_THRESHOLD);
         assertBusy(() -> assertThat(localHealthMonitor.getLastReportedDiskHealthInfo(), equalTo(nextHealthStatus)));
     }
 
-    private void assertRemainsUnchanged(Supplier<DiskHealthInfo> supplier, DiskHealthInfo expected) {
-        expectThrows(AssertionError.class, () -> assertBusy(() -> assertThat(supplier.get(), not(expected)), 1, TimeUnit.SECONDS));
-    }
-
     public void testNoDiskData() {
         when(
             nodeService.stats(
@@ -175,7 +269,7 @@ public class LocalHealthMonitorTests extends ESTestCase {
         simulateHealthDiskSpace();
         LocalHealthMonitor.DiskCheck diskMonitor = new LocalHealthMonitor.DiskCheck(nodeService);
         DiskHealthInfo diskHealth = diskMonitor.getHealth(healthMetadata, clusterState);
-        assertThat(diskHealth, equalTo(new DiskHealthInfo(HealthStatus.GREEN, null)));
+        assertThat(diskHealth, equalTo(GREEN));
     }
 
     public void testYellowDiskStatus() {
@@ -199,7 +293,7 @@ public class LocalHealthMonitorTests extends ESTestCase {
         );
         LocalHealthMonitor.DiskCheck diskMonitor = new LocalHealthMonitor.DiskCheck(nodeService);
         DiskHealthInfo diskHealth = diskMonitor.getHealth(healthMetadata, clusterStateFrozenLocalNode);
-        assertThat(diskHealth, equalTo(new DiskHealthInfo(HealthStatus.GREEN, null)));
+        assertThat(diskHealth, equalTo(GREEN));
     }
 
     public void testFrozenRedDiskStatus() {

+ 142 - 0
server/src/test/java/org/elasticsearch/health/node/UpdateHealthInfoCacheActionTests.java

@@ -0,0 +1,142 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.health.node;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.ActionTestUtils;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.health.HealthStatus;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.EqualsHashCodeTestUtils;
+import org.elasticsearch.test.transport.CapturingTransport;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
+import static org.elasticsearch.test.ClusterServiceUtils.setState;
+import static org.hamcrest.Matchers.equalTo;
+
+public class UpdateHealthInfoCacheActionTests extends ESTestCase {
+    private static ThreadPool threadPool;
+
+    private ClusterService clusterService;
+    private TransportService transportService;
+    private DiscoveryNode localNode;
+    private DiscoveryNode[] allNodes;
+
+    @BeforeClass
+    public static void beforeClass() {
+        threadPool = new TestThreadPool("UpdateHealthInfoCacheAction");
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        clusterService = createClusterService(threadPool);
+        CapturingTransport transport = new CapturingTransport();
+        transportService = transport.createTransportService(
+            clusterService.getSettings(),
+            threadPool,
+            TransportService.NOOP_TRANSPORT_INTERCEPTOR,
+            x -> clusterService.localNode(),
+            null,
+            Collections.emptySet()
+        );
+        transportService.start();
+        transportService.acceptIncomingRequests();
+        localNode = new DiscoveryNode(
+            "local_node",
+            buildNewFakeTransportAddress(),
+            Collections.emptyMap(),
+            Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE),
+            Version.CURRENT
+        );
+        allNodes = new DiscoveryNode[] { localNode };
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        clusterService.close();
+        transportService.close();
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
+        threadPool = null;
+    }
+
+    public void testAction() throws ExecutionException, InterruptedException {
+        DiskHealthInfo diskHealthInfo = new DiskHealthInfo(HealthStatus.GREEN, null);
+        UpdateHealthInfoCacheAction.Request request = new UpdateHealthInfoCacheAction.Request(localNode.getId(), diskHealthInfo);
+        PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
+        setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, localNode, allNodes));
+        HealthInfoCache healthInfoCache = HealthInfoCache.create(clusterService);
+        final AcknowledgedResponse expectedResponse = AcknowledgedResponse.of(true);
+        ActionTestUtils.execute(
+            new UpdateHealthInfoCacheAction.TransportAction(
+                transportService,
+                clusterService,
+                threadPool,
+                new ActionFilters(Set.of()),
+                healthInfoCache
+            ),
+            null,
+            request,
+            listener
+        );
+        AcknowledgedResponse actualResponse = listener.get();
+        assertThat(actualResponse, equalTo(expectedResponse));
+        assertThat(healthInfoCache.getDiskHealthInfo().get(localNode.getId()), equalTo(diskHealthInfo));
+    }
+
+    public void testRequestSerialization() {
+        DiskHealthInfo diskHealthInfo = randomBoolean()
+            ? new DiskHealthInfo(randomFrom(HealthStatus.values()))
+            : new DiskHealthInfo(randomFrom(HealthStatus.values()), randomFrom(DiskHealthInfo.Cause.values()));
+        UpdateHealthInfoCacheAction.Request request = new UpdateHealthInfoCacheAction.Request(randomAlphaOfLength(10), diskHealthInfo);
+        EqualsHashCodeTestUtils.checkEqualsAndHashCode(
+            request,
+            serializedRequest -> copyWriteable(serializedRequest, writableRegistry(), UpdateHealthInfoCacheAction.Request::new),
+            this::mutateRequest
+        );
+    }
+
+    private UpdateHealthInfoCacheAction.Request mutateRequest(UpdateHealthInfoCacheAction.Request request) {
+        String nodeId = request.getNodeId();
+        DiskHealthInfo diskHealthInfo = request.getDiskHealthInfo();
+        switch (randomIntBetween(1, 2)) {
+            case 1 -> nodeId = randomAlphaOfLength(10);
+            case 2 -> diskHealthInfo = new DiskHealthInfo(
+                randomValueOtherThan(diskHealthInfo.healthStatus(), () -> randomFrom(HealthStatus.values())),
+                randomBoolean() ? null : randomFrom(DiskHealthInfo.Cause.values())
+            );
+            default -> throw new IllegalStateException();
+        }
+        return new UpdateHealthInfoCacheAction.Request(nodeId, diskHealthInfo);
+    }
+}