Переглянути джерело

Add Health API Periodic Logging (#96772)

Logs the health status of the cluster and of each health indicator as observed by the Health API
Matt Culbreth 2 роки тому
батько
коміт
da81686125

+ 5 - 0
docs/changelog/96772.yaml

@@ -0,0 +1,5 @@
+pr: 96772
+summary: Health API Periodic Logging
+area: Health
+type: enhancement
+issues: []

+ 3 - 0
docs/reference/settings/health-diagnostic-settings.asciidoc

@@ -45,3 +45,6 @@ comprise its local health such as its disk usage.
 `health.ilm.max_retries_per_step`::
 (<<cluster-update-settings,Dynamic>>) The minimum amount of times an index has retried by an {ilm-init} step before it is considered stagnant. Defaults to `100`
 
+`health.periodic_logger.poll_interval`::
+(<<cluster-update-settings,Dynamic>>, <<time-units, time unit value>>) How often {es} logs the health status of the cluster and of each health indicator as observed by the Health API.
+Defaults to `60s` (60 seconds).

+ 2 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -72,6 +72,7 @@ import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.gateway.PersistedClusterStateService;
+import org.elasticsearch.health.HealthPeriodicLogger;
 import org.elasticsearch.health.node.LocalHealthMonitor;
 import org.elasticsearch.health.node.action.TransportHealthNodeAction;
 import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
@@ -567,6 +568,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
         TcpTransport.isUntrustedRemoteClusterEnabled() ? RemoteClusterPortSettings.TCP_NO_DELAY : null,
         TcpTransport.isUntrustedRemoteClusterEnabled() ? RemoteClusterPortSettings.TCP_REUSE_ADDRESS : null,
         TcpTransport.isUntrustedRemoteClusterEnabled() ? RemoteClusterPortSettings.TCP_SEND_BUFFER_SIZE : null,
+        HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_POLL_INTERVAL_SETTING,
         DataStreamLifecycle.isEnabled() ? DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING : null,
         IndicesClusterStateService.SHARD_LOCK_RETRY_INTERVAL_SETTING,
         IndicesClusterStateService.SHARD_LOCK_RETRY_TIMEOUT_SETTING,

+ 269 - 0
server/src/main/java/org/elasticsearch/health/HealthPeriodicLogger.java

@@ -0,0 +1,269 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.health;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.component.Lifecycle;
+import org.elasticsearch.common.logging.ESLogMessage;
+import org.elasticsearch.common.scheduler.SchedulerEngine;
+import org.elasticsearch.common.scheduler.TimeValueSchedule;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.RunOnce;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.gateway.GatewayService;
+import org.elasticsearch.health.node.selection.HealthNode;
+
+import java.io.Closeable;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class periodically logs the results of the Health API to the standard Elasticsearch server log file.
+ */
+public class HealthPeriodicLogger implements ClusterStateListener, Closeable, SchedulerEngine.Listener {
+    public static final String HEALTH_FIELD_PREFIX = "elasticsearch.health";
+
+    public static final String HEALTH_PERIODIC_LOGGER_POLL_INTERVAL = "health.periodic_logger.poll_interval";
+    public static final Setting<TimeValue> HEALTH_PERIODIC_LOGGER_POLL_INTERVAL_SETTING = Setting.timeSetting(
+        HEALTH_PERIODIC_LOGGER_POLL_INTERVAL,
+        TimeValue.timeValueSeconds(60),
+        TimeValue.timeValueSeconds(15),
+        Setting.Property.Dynamic,
+        Setting.Property.NodeScope
+    );
+
+    /**
+     * Name constant for the job HealthService schedules
+     */
+    protected static final String HEALTH_PERIODIC_LOGGER_JOB_NAME = "health_periodic_logger";
+
+    private final Settings settings;
+
+    private final ClusterService clusterService;
+    private final Client client;
+
+    private final HealthService healthService;
+    private final Clock clock;
+
+    // default visibility for testing purposes
+    volatile boolean isHealthNode = false;
+
+    private final AtomicBoolean currentlyRunning = new AtomicBoolean(false);
+
+    private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
+    private volatile TimeValue pollInterval;
+
+    private static final Logger logger = LogManager.getLogger(HealthPeriodicLogger.class);
+
+    /**
+     * Creates a new HealthPeriodicLogger.
+     * This creates a scheduled job using the SchedulerEngine framework and runs it on the current health node.
+     *
+     * @param settings the cluster settings, used to get the interval setting.
+     * @param clusterService the cluster service, used to know when the health node changes.
+     * @param client the client used to call the Health Service.
+     * @param healthService the Health Service, where the actual Health API logic lives.
+     */
+    public HealthPeriodicLogger(Settings settings, ClusterService clusterService, Client client, HealthService healthService) {
+        this.settings = settings;
+        this.clusterService = clusterService;
+        this.client = client;
+        this.healthService = healthService;
+        this.clock = Clock.systemUTC();
+        this.pollInterval = HEALTH_PERIODIC_LOGGER_POLL_INTERVAL_SETTING.get(settings);
+    }
+
+    /**
+     * Initializer method to avoid the publication of a self reference in the constructor.
+     */
+    public void init() {
+        clusterService.addListener(this);
+        clusterService.getClusterSettings()
+            .addSettingsUpdateConsumer(HEALTH_PERIODIC_LOGGER_POLL_INTERVAL_SETTING, this::updatePollInterval);
+    }
+
+    @Override
+    public void clusterChanged(ClusterChangedEvent event) {
+        // wait for the cluster state to be recovered
+        if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
+            return;
+        }
+
+        DiscoveryNode healthNode = HealthNode.findHealthNode(event.state());
+        if (healthNode == null) {
+            this.isHealthNode = false;
+            this.maybeCancelJob();
+            return;
+        }
+        final boolean isCurrentlyHealthNode = healthNode.getId().equals(this.clusterService.localNode().getId());
+        if (this.isHealthNode != isCurrentlyHealthNode) {
+            this.isHealthNode = isCurrentlyHealthNode;
+            if (this.isHealthNode) {
+                // we weren't the health node, and now we are
+                maybeScheduleJob();
+            } else {
+                // we were the health node, and now we aren't
+                maybeCancelJob();
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        SchedulerEngine engine = scheduler.get();
+        if (engine != null) {
+            engine.stop();
+        }
+    }
+
+    @Override
+    public void triggered(SchedulerEngine.Event event) {
+        if (event.getJobName().equals(HEALTH_PERIODIC_LOGGER_JOB_NAME)) {
+            this.tryToLogHealth();
+        }
+    }
+
+    // default visibility for testing purposes
+    void tryToLogHealth() {
+        if (this.currentlyRunning.compareAndExchange(false, true) == false) {
+            RunOnce release = new RunOnce(() -> currentlyRunning.set(false));
+            try {
+                ActionListener<List<HealthIndicatorResult>> listenerWithRelease = ActionListener.runAfter(resultsListener, release);
+                this.healthService.getHealth(this.client, null, false, 0, listenerWithRelease);
+            } catch (Exception e) {
+                logger.warn(() -> "The health periodic logger encountered an error.", e);
+                // In case of an exception before the listener was wired, we can release the flag here, and we feel safe
+                // that it will not release it again because this can only be run once.
+                release.run();
+            }
+        }
+    }
+
+    // default visibility for testing purposes
+    SchedulerEngine getScheduler() {
+        return this.scheduler.get();
+    }
+
+    /**
+     * Create a Map of the results, which is then turned into JSON for logging.
+     *
+     * The structure looks like:
+     * {"elasticsearch.health.overall.status": "green", "elasticsearch.health.[other indicators].status": "green"}
+     * Only the indicator status values are included, along with the computed top-level status.
+     *
+     * @param indicatorResults the results of the Health API call that will be used as the output.
+     */
+    // default visibility for testing purposes
+    static Map<String, Object> convertToLoggedFields(List<HealthIndicatorResult> indicatorResults) {
+        if (indicatorResults == null || indicatorResults.isEmpty()) {
+            return Map.of();
+        }
+
+        final Map<String, Object> result = new HashMap<>();
+
+        // overall status
+        final HealthStatus status = HealthStatus.merge(indicatorResults.stream().map(HealthIndicatorResult::status));
+        result.put(String.format(Locale.ROOT, "%s.overall.status", HEALTH_FIELD_PREFIX), status.xContentValue());
+
+        // top-level status for each indicator
+        indicatorResults.forEach((indicatorResult) -> {
+            result.put(
+                String.format(Locale.ROOT, "%s.%s.status", HEALTH_FIELD_PREFIX, indicatorResult.name()),
+                indicatorResult.status().xContentValue()
+            );
+        });
+
+        return result;
+    }
+
+    /**
+     * Handle the result of the Health Service getHealth call
+     */
+    // default visibility for testing purposes
+    final ActionListener<List<HealthIndicatorResult>> resultsListener = new ActionListener<List<HealthIndicatorResult>>() {
+        @Override
+        public void onResponse(List<HealthIndicatorResult> healthIndicatorResults) {
+            try {
+                Map<String, Object> resultsMap = convertToLoggedFields(healthIndicatorResults);
+
+                // if we have a valid response, log in JSON format
+                if (resultsMap.isEmpty() == false) {
+                    ESLogMessage msg = new ESLogMessage().withFields(resultsMap);
+                    logger.info(msg);
+                }
+            } catch (Exception e) {
+                logger.warn("Health Periodic Logger error:{}", e.toString());
+            }
+        }
+
+        @Override
+        public void onFailure(Exception e) {
+            logger.warn("Health Periodic Logger error:{}", e.toString());
+        }
+    };
+
+    /**
+     * Create the SchedulerEngine.Job if this node is the health node
+     */
+    private void maybeScheduleJob() {
+        if (this.isHealthNode == false) {
+            return;
+        }
+
+        // don't schedule the job if the node is shutting down
+        if (isClusterServiceStoppedOrClosed()) {
+            logger.trace(
+                "Skipping scheduling a health periodic logger job due to the cluster lifecycle state being: [{}] ",
+                clusterService.lifecycleState()
+            );
+            return;
+        }
+
+        if (scheduler.get() == null) {
+            scheduler.set(new SchedulerEngine(settings, clock));
+            scheduler.get().register(this);
+        }
+
+        assert scheduler.get() != null : "scheduler should be available";
+        final SchedulerEngine.Job scheduledJob = new SchedulerEngine.Job(
+            HEALTH_PERIODIC_LOGGER_JOB_NAME,
+            new TimeValueSchedule(pollInterval)
+        );
+        scheduler.get().add(scheduledJob);
+    }
+
+    private void maybeCancelJob() {
+        if (scheduler.get() != null) {
+            scheduler.get().remove(HEALTH_PERIODIC_LOGGER_JOB_NAME);
+        }
+    }
+
+    private void updatePollInterval(TimeValue newInterval) {
+        this.pollInterval = newInterval;
+        maybeScheduleJob();
+    }
+
+    private boolean isClusterServiceStoppedOrClosed() {
+        final Lifecycle.State state = clusterService.lifecycleState();
+        return state == Lifecycle.State.STOPPED || state == Lifecycle.State.CLOSED;
+    }
+}

+ 14 - 0
server/src/main/java/org/elasticsearch/node/Node.java

@@ -108,6 +108,7 @@ import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.gateway.MetaStateService;
 import org.elasticsearch.gateway.PersistedClusterStateService;
 import org.elasticsearch.health.HealthIndicatorService;
+import org.elasticsearch.health.HealthPeriodicLogger;
 import org.elasticsearch.health.HealthService;
 import org.elasticsearch.health.metadata.HealthMetadataService;
 import org.elasticsearch.health.node.DiskHealthIndicatorService;
@@ -1029,6 +1030,8 @@ public class Node implements Closeable {
                 threadPool,
                 systemIndices
             );
+            HealthPeriodicLogger healthPeriodicLogger = createHealthPeriodicLogger(clusterService, settings, client, healthService);
+            healthPeriodicLogger.init();
             HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, settings);
             LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client);
             HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);
@@ -1136,6 +1139,7 @@ public class Node implements Closeable {
                 b.bind(Tracer.class).toInstance(tracer);
                 b.bind(FileSettingsService.class).toInstance(fileSettingsService);
                 b.bind(WriteLoadForecaster.class).toInstance(writeLoadForecaster);
+                b.bind(HealthPeriodicLogger.class).toInstance(healthPeriodicLogger);
             });
 
             if (ReadinessService.enabled(environment)) {
@@ -1285,6 +1289,15 @@ public class Node implements Closeable {
         );
     }
 
+    private HealthPeriodicLogger createHealthPeriodicLogger(
+        ClusterService clusterService,
+        Settings settings,
+        NodeClient client,
+        HealthService healthService
+    ) {
+        return new HealthPeriodicLogger(settings, clusterService, client, healthService);
+    }
+
     private RecoveryPlannerService getRecoveryPlannerService(
         ThreadPool threadPool,
         ClusterService clusterService,
@@ -1659,6 +1672,7 @@ public class Node implements Closeable {
             toClose.add(injector.getInstance(ReadinessService.class));
         }
         toClose.add(injector.getInstance(FileSettingsService.class));
+        toClose.add(injector.getInstance(HealthPeriodicLogger.class));
 
         for (LifecycleComponent plugin : pluginLifecycleComponents) {
             toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));

+ 423 - 0
server/src/test/java/org/elasticsearch/health/HealthPeriodicLoggerTests.java

@@ -0,0 +1,423 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.health;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.scheduler.SchedulerEngine;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockLogAppender;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.elasticsearch.health.HealthStatus.GREEN;
+import static org.elasticsearch.health.HealthStatus.YELLOW;
+import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+public class HealthPeriodicLoggerTests extends ESTestCase {
+    private ThreadPool threadPool;
+
+    private NodeClient client;
+    private ClusterService clusterService;
+
+    private HealthPeriodicLogger testHealthPeriodicLogger;
+    private ClusterService testClusterService;
+
+    private NodeClient getTestClient() {
+        return mock(NodeClient.class);
+    }
+
+    private HealthService getMockedHealthService() {
+        return mock(HealthService.class);
+    }
+
+    @Before
+    public void setupServices() {
+        threadPool = new TestThreadPool(getTestName());
+
+        Set<Setting<?>> builtInClusterSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        builtInClusterSettings.add(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_POLL_INTERVAL_SETTING);
+        ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, builtInClusterSettings);
+        this.clusterService = createClusterService(this.threadPool, clusterSettings);
+
+        this.client = getTestClient();
+
+    }
+
+    @After
+    public void cleanup() {
+        clusterService.close();
+        if (testClusterService != null) {
+            testClusterService.close();
+        }
+        if (testHealthPeriodicLogger != null) {
+            testHealthPeriodicLogger.close();
+        }
+        threadPool.shutdownNow();
+        client.close();
+    }
+
+    private List<HealthIndicatorResult> getTestIndicatorResults() {
+        var networkLatency = new HealthIndicatorResult("network_latency", GREEN, null, null, null, null);
+        var slowTasks = new HealthIndicatorResult("slow_task_assignment", YELLOW, null, null, null, null);
+        var shardsAvailable = new HealthIndicatorResult("shards_availability", GREEN, null, null, null, null);
+
+        return List.of(networkLatency, slowTasks, shardsAvailable);
+    }
+
+    private String makeHealthStatusString(String key) {
+        return String.format(Locale.ROOT, "%s.%s.status", HealthPeriodicLogger.HEALTH_FIELD_PREFIX, key);
+    }
+
+    public void testConvertToLoggedFields() {
+        var results = getTestIndicatorResults();
+        var overallStatus = HealthStatus.merge(results.stream().map(HealthIndicatorResult::status));
+
+        Map<String, Object> loggerResults = HealthPeriodicLogger.convertToLoggedFields(results);
+
+        assertThat(loggerResults.size(), equalTo(results.size() + 1));
+
+        // test indicator status
+        assertThat(loggerResults.get(makeHealthStatusString("network_latency")), equalTo("green"));
+        assertThat(loggerResults.get(makeHealthStatusString("slow_task_assignment")), equalTo("yellow"));
+        assertThat(loggerResults.get(makeHealthStatusString("shards_availability")), equalTo("green"));
+
+        // test calculated overall status
+        assertThat(loggerResults.get(makeHealthStatusString("overall")), equalTo(overallStatus.xContentValue()));
+
+        // test empty results
+        {
+            List<HealthIndicatorResult> empty = new ArrayList<>();
+            Map<String, Object> emptyResults = HealthPeriodicLogger.convertToLoggedFields(empty);
+
+            assertThat(emptyResults.size(), equalTo(0));
+        }
+    }
+
+    public void testHealthNodeIsSelected() {
+        HealthService testHealthService = this.getMockedHealthService();
+
+        // create a cluster topology
+        final DiscoveryNode node1 = DiscoveryNodeUtils.builder("node_1").roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)).build();
+        final DiscoveryNode node2 = DiscoveryNodeUtils.builder("node_2")
+            .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE))
+            .build();
+        ClusterState current = ClusterStateCreationUtils.state(node2, node1, node2, new DiscoveryNode[] { node1, node2 });
+
+        testClusterService = createClusterService(current, this.threadPool);
+        testHealthPeriodicLogger = new HealthPeriodicLogger(Settings.EMPTY, testClusterService, client, testHealthService);
+        testHealthPeriodicLogger.init();
+
+        // test that it knows that it's not initially the health node
+        assertFalse(testHealthPeriodicLogger.isHealthNode);
+
+        // trigger a cluster change and recheck
+        testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", current, ClusterState.EMPTY_STATE));
+        assertTrue(testHealthPeriodicLogger.isHealthNode);
+    }
+
+    public void testJobScheduling() {
+        HealthService testHealthService = this.getMockedHealthService();
+
+        // create a cluster topology
+        final DiscoveryNode node1 = DiscoveryNodeUtils.builder("node_1").roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)).build();
+        final DiscoveryNode node2 = DiscoveryNodeUtils.builder("node_2")
+            .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE))
+            .build();
+        ClusterState current = ClusterStateCreationUtils.state(node2, node1, node2, new DiscoveryNode[] { node1, node2 });
+
+        testClusterService = createClusterService(current, this.threadPool);
+        testHealthPeriodicLogger = new HealthPeriodicLogger(Settings.EMPTY, testClusterService, client, testHealthService);
+        testHealthPeriodicLogger.init();
+
+        testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", current, ClusterState.EMPTY_STATE));
+        assertTrue(testHealthPeriodicLogger.isHealthNode);
+
+        SchedulerEngine scheduler = testHealthPeriodicLogger.getScheduler();
+        assertTrue(scheduler.scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME));
+
+        ClusterState noHealthNode = ClusterStateCreationUtils.state(node2, node1, new DiscoveryNode[] { node1, node2 });
+        testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", noHealthNode, current));
+        assertFalse(testHealthPeriodicLogger.isHealthNode);
+        assertFalse(scheduler.scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME));
+    }
+
+    public void testTriggeredJobCallsTryToLogHealth() throws Exception {
+        AtomicBoolean listenerCalled = new AtomicBoolean(false);
+        AtomicBoolean failureCalled = new AtomicBoolean(false);
+        ActionListener<List<HealthIndicatorResult>> testListener = new ActionListener<>() {
+            @Override
+            public void onResponse(List<HealthIndicatorResult> indicatorResults) {
+                listenerCalled.set(true);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                failureCalled.set(true);
+            }
+        };
+        HealthService testHealthService = this.getMockedHealthService();
+
+        testHealthPeriodicLogger = new HealthPeriodicLogger(Settings.EMPTY, clusterService, client, testHealthService);
+        testHealthPeriodicLogger.init();
+
+        HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
+        spyHealthPeriodicLogger.isHealthNode = true;
+        doAnswer(invocation -> {
+            testListener.onResponse(getTestIndicatorResults());
+            return null;
+        }).when(spyHealthPeriodicLogger).tryToLogHealth();
+
+        SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
+        spyHealthPeriodicLogger.triggered(event);
+
+        assertBusy(() -> assertFalse(failureCalled.get()));
+        assertBusy(() -> assertTrue(listenerCalled.get()));
+    }
+
+    public void testResultFailureHandling() throws Exception {
+        AtomicInteger getHealthCalled = new AtomicInteger(0);
+
+        HealthService testHealthService = this.getMockedHealthService();
+
+        testHealthPeriodicLogger = new HealthPeriodicLogger(Settings.EMPTY, clusterService, client, testHealthService);
+        testHealthPeriodicLogger.init();
+
+        HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
+        spyHealthPeriodicLogger.isHealthNode = true;
+
+        SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
+
+        // run it and call the listener's onFailure
+        {
+            doAnswer(invocation -> {
+                ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
+                listener.onFailure(new Exception("fake failure"));
+                getHealthCalled.incrementAndGet();
+                return null;
+            }).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
+            spyHealthPeriodicLogger.triggered(event);
+            assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(1)));
+        }
+
+        // run it again and verify that the concurrency control is reset and the getHealth is called
+        {
+            doAnswer(invocation -> {
+                ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
+                listener.onResponse(getTestIndicatorResults());
+                getHealthCalled.incrementAndGet();
+                return null;
+            }).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
+            spyHealthPeriodicLogger.triggered(event);
+            assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(2)));
+        }
+    }
+
+    public void testTryToLogHealthConcurrencyControlWithResults() throws Exception {
+        AtomicInteger getHealthCalled = new AtomicInteger(0);
+
+        HealthService testHealthService = this.getMockedHealthService();
+        doAnswer(invocation -> {
+            // get and call the results listener provided to getHealth
+            ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
+            listener.onResponse(getTestIndicatorResults());
+            getHealthCalled.incrementAndGet();
+            return null;
+        }).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
+
+        testHealthPeriodicLogger = new HealthPeriodicLogger(Settings.EMPTY, clusterService, client, testHealthService);
+        testHealthPeriodicLogger.init();
+
+        HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
+        spyHealthPeriodicLogger.isHealthNode = true;
+
+        SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
+
+        // run it once, verify getHealth is called
+        {
+            spyHealthPeriodicLogger.triggered(event);
+            assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(1)));
+        }
+
+        // run it again, verify getHealth is called, because we are calling the results listener
+        {
+            spyHealthPeriodicLogger.triggered(event);
+            assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(2)));
+        }
+    }
+
+    public void testTryToLogHealthConcurrencyControl() throws Exception {
+        AtomicInteger getHealthCalled = new AtomicInteger(0);
+
+        HealthService testHealthService = this.getMockedHealthService();
+        doAnswer(invocation -> {
+            // get but do not call the provided listener
+            ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
+            assertNotNull(listener);
+
+            // note that we received the getHealth call
+            getHealthCalled.incrementAndGet();
+            return null;
+        }).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
+
+        testHealthPeriodicLogger = new HealthPeriodicLogger(Settings.EMPTY, clusterService, client, testHealthService);
+        testHealthPeriodicLogger.init();
+
+        HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
+        spyHealthPeriodicLogger.isHealthNode = true;
+
+        SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
+
+        // call it once, and verify that getHealth is called
+        {
+            spyHealthPeriodicLogger.triggered(event);
+            assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(1)));
+        }
+
+        // trigger it again, and verify that getHealth is not called
+        // it's not called because the results listener was never called by getHealth
+        // this is simulating a double invocation of getHealth, due perhaps to a lengthy getHealth call
+        {
+            spyHealthPeriodicLogger.triggered(event);
+            assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(1)));
+        }
+    }
+
+    public void testTryToLogHealthConcurrencyControlWithException() throws Exception {
+        AtomicInteger getHealthCalled = new AtomicInteger(0);
+
+        HealthService testHealthService = this.getMockedHealthService();
+
+        testHealthPeriodicLogger = new HealthPeriodicLogger(Settings.EMPTY, clusterService, client, testHealthService);
+        testHealthPeriodicLogger.init();
+
+        HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
+        spyHealthPeriodicLogger.isHealthNode = true;
+
+        SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
+
+        // run it once and trigger an exception during the getHealth call
+        {
+            doThrow(new ResourceNotFoundException("No preflight indicators")).when(testHealthService)
+                .getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
+            spyHealthPeriodicLogger.triggered(event);
+            assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(0)));
+        }
+
+        // run it again and have getHealth work. This tests that the RunOnce still sets the currentlyRunning variable.
+        {
+            doAnswer(invocation -> {
+                ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
+                listener.onResponse(getTestIndicatorResults());
+                getHealthCalled.incrementAndGet();
+                return null;
+            }).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
+            spyHealthPeriodicLogger.triggered(event);
+            assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(1)));
+        }
+    }
+
+    public void testLoggingHappens() {
+        MockLogAppender mockAppender = new MockLogAppender();
+        mockAppender.start();
+        mockAppender.addExpectation(
+            new MockLogAppender.SeenEventExpectation(
+                "overall",
+                HealthPeriodicLogger.class.getCanonicalName(),
+                Level.INFO,
+                String.format(Locale.ROOT, "%s=\"yellow\"", makeHealthStatusString("overall"))
+            )
+        );
+        mockAppender.addExpectation(
+            new MockLogAppender.SeenEventExpectation(
+                "network_latency",
+                HealthPeriodicLogger.class.getCanonicalName(),
+                Level.INFO,
+                String.format(Locale.ROOT, "%s=\"green\"", makeHealthStatusString("network_latency"))
+            )
+        );
+        mockAppender.addExpectation(
+            new MockLogAppender.SeenEventExpectation(
+                "slow_task_assignment",
+                HealthPeriodicLogger.class.getCanonicalName(),
+                Level.INFO,
+                String.format(Locale.ROOT, "%s=\"yellow\"", makeHealthStatusString("slow_task_assignment"))
+            )
+        );
+        mockAppender.addExpectation(
+            new MockLogAppender.UnseenEventExpectation(
+                "ilm",
+                HealthPeriodicLogger.class.getCanonicalName(),
+                Level.INFO,
+                String.format(Locale.ROOT, "%s=\"red\"", makeHealthStatusString("ilm"))
+            )
+        );
+        Logger periodicLoggerLogger = LogManager.getLogger(HealthPeriodicLogger.class);
+        Loggers.addAppender(periodicLoggerLogger, mockAppender);
+
+        HealthService testHealthService = this.getMockedHealthService();
+
+        testHealthPeriodicLogger = new HealthPeriodicLogger(Settings.EMPTY, clusterService, client, testHealthService);
+        testHealthPeriodicLogger.init();
+
+        HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
+        spyHealthPeriodicLogger.isHealthNode = true;
+        doAnswer(invocation -> {
+            spyHealthPeriodicLogger.resultsListener.onResponse(getTestIndicatorResults());
+            return null;
+        }).when(spyHealthPeriodicLogger).tryToLogHealth();
+
+        SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
+        spyHealthPeriodicLogger.triggered(event);
+
+        try {
+            mockAppender.assertAllExpectationsMatched();
+        } finally {
+            Loggers.removeAppender(periodicLoggerLogger, mockAppender);
+            mockAppender.stop();
+        }
+
+    }
+
+}