|
@@ -21,6 +21,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
+import org.elasticsearch.common.component.Lifecycle;
|
|
|
import org.elasticsearch.common.logging.ESLogMessage;
|
|
|
import org.elasticsearch.common.logging.Loggers;
|
|
|
import org.elasticsearch.common.scheduler.SchedulerEngine;
|
|
@@ -42,8 +43,11 @@ import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.BiConsumer;
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
@@ -60,7 +64,6 @@ import static org.mockito.Mockito.doAnswer;
|
|
|
import static org.mockito.Mockito.doReturn;
|
|
|
import static org.mockito.Mockito.doThrow;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
-import static org.mockito.Mockito.spy;
|
|
|
|
|
|
public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
private ThreadPool threadPool;
|
|
@@ -105,7 +108,13 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
public void cleanup() {
|
|
|
clusterService.close();
|
|
|
if (testHealthPeriodicLogger != null) {
|
|
|
- testHealthPeriodicLogger.close();
|
|
|
+ if (testHealthPeriodicLogger.lifecycleState() == Lifecycle.State.STARTED) {
|
|
|
+ testHealthPeriodicLogger.stop();
|
|
|
+ }
|
|
|
+ if (testHealthPeriodicLogger.lifecycleState() == Lifecycle.State.INITIALIZED
|
|
|
+ || testHealthPeriodicLogger.lifecycleState() == Lifecycle.State.STOPPED) {
|
|
|
+ testHealthPeriodicLogger.close();
|
|
|
+ }
|
|
|
}
|
|
|
threadPool.shutdownNow();
|
|
|
}
|
|
@@ -157,32 +166,42 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
|
|
|
public void testHealthNodeIsSelected() {
|
|
|
HealthService testHealthService = this.getMockedHealthService();
|
|
|
- testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(clusterService, testHealthService, true);
|
|
|
+ testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(clusterService, testHealthService, randomBoolean());
|
|
|
|
|
|
// test that it knows that it's not initially the health node
|
|
|
- assertFalse(testHealthPeriodicLogger.isHealthNode);
|
|
|
+ assertFalse(testHealthPeriodicLogger.isHealthNode());
|
|
|
|
|
|
// trigger a cluster change and recheck
|
|
|
testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
- assertTrue(testHealthPeriodicLogger.isHealthNode);
|
|
|
+ assertTrue(testHealthPeriodicLogger.isHealthNode());
|
|
|
}
|
|
|
|
|
|
- public void testJobScheduling() {
|
|
|
+ public void testJobScheduling() throws Exception {
|
|
|
HealthService testHealthService = this.getMockedHealthService();
|
|
|
|
|
|
- testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(clusterService, testHealthService, true);
|
|
|
+ testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(clusterService, testHealthService, false);
|
|
|
|
|
|
testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
- assertTrue(testHealthPeriodicLogger.isHealthNode);
|
|
|
-
|
|
|
- SchedulerEngine scheduler = testHealthPeriodicLogger.getScheduler();
|
|
|
- assertNotNull(scheduler);
|
|
|
- assertTrue(scheduler.scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME));
|
|
|
+ assertTrue("local node should be the health node", testHealthPeriodicLogger.isHealthNode());
|
|
|
+ assertTrue("health logger should be enabled", testHealthPeriodicLogger.enabled());
|
|
|
|
|
|
+ // Even if this is the health node, we do not schedule a job because the service is not started yet
|
|
|
+ assertNull(testHealthPeriodicLogger.getScheduler());
|
|
|
+ // Starting the service should schedule a try to schedule a run
|
|
|
+ testHealthPeriodicLogger.start();
|
|
|
+ AtomicReference<SchedulerEngine> scheduler = new AtomicReference<>();
|
|
|
+ assertBusy(() -> {
|
|
|
+ var s = testHealthPeriodicLogger.getScheduler();
|
|
|
+ assertNotNull(s);
|
|
|
+ scheduler.set(s);
|
|
|
+ });
|
|
|
+ assertTrue(scheduler.get().scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME));
|
|
|
+
|
|
|
+ // Changing the health node should cancel the run
|
|
|
ClusterState noHealthNode = ClusterStateCreationUtils.state(node2, node1, new DiscoveryNode[] { node1, node2 });
|
|
|
testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", noHealthNode, stateWithLocalHealthNode));
|
|
|
- assertFalse(testHealthPeriodicLogger.isHealthNode);
|
|
|
- assertFalse(scheduler.scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME));
|
|
|
+ assertFalse(testHealthPeriodicLogger.isHealthNode());
|
|
|
+ assertFalse(scheduler.get().scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME));
|
|
|
}
|
|
|
|
|
|
public void testEnabled() {
|
|
@@ -190,11 +209,12 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(clusterService, testHealthService, true);
|
|
|
|
|
|
testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
- assertTrue(testHealthPeriodicLogger.isHealthNode);
|
|
|
+ verifyLoggerIsReadyToRun(testHealthPeriodicLogger);
|
|
|
|
|
|
// disable it and then verify that the job is gone
|
|
|
{
|
|
|
this.clusterSettings.applySettings(Settings.builder().put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), false).build());
|
|
|
+ assertFalse(testHealthPeriodicLogger.enabled());
|
|
|
assertFalse(
|
|
|
testHealthPeriodicLogger.getScheduler().scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME)
|
|
|
);
|
|
@@ -203,60 +223,119 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
// enable it and then verify that the job is created
|
|
|
{
|
|
|
this.clusterSettings.applySettings(Settings.builder().put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), true).build());
|
|
|
+ assertTrue(testHealthPeriodicLogger.enabled());
|
|
|
assertTrue(
|
|
|
testHealthPeriodicLogger.getScheduler().scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME)
|
|
|
);
|
|
|
}
|
|
|
+ // ensure the job is not recreated during enabling if the service has stopped
|
|
|
+ {
|
|
|
+ testHealthPeriodicLogger.stop();
|
|
|
+ assertFalse(
|
|
|
+ testHealthPeriodicLogger.getScheduler().scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME)
|
|
|
+ );
|
|
|
+ this.clusterSettings.applySettings(Settings.builder().put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), true).build());
|
|
|
+ assertTrue(testHealthPeriodicLogger.enabled());
|
|
|
+ assertFalse(
|
|
|
+ testHealthPeriodicLogger.getScheduler().scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME)
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testUpdatePollInterval() {
|
|
|
HealthService testHealthService = this.getMockedHealthService();
|
|
|
-
|
|
|
testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(clusterService, testHealthService, false);
|
|
|
- assertNull(testHealthPeriodicLogger.getScheduler());
|
|
|
-
|
|
|
testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
- assertTrue(testHealthPeriodicLogger.isHealthNode);
|
|
|
-
|
|
|
- // verify that updating the polling interval doesn't schedule the job
|
|
|
+ assertTrue("local node should be the health node", testHealthPeriodicLogger.isHealthNode());
|
|
|
+ assertTrue("health logger should be enabled", testHealthPeriodicLogger.enabled());
|
|
|
+ // Ensure updating the poll interval won't trigger a job when service not started
|
|
|
{
|
|
|
+ TimeValue pollInterval = TimeValue.timeValueSeconds(randomIntBetween(15, 59));
|
|
|
this.clusterSettings.applySettings(
|
|
|
- Settings.builder().put(HealthPeriodicLogger.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(15)).build()
|
|
|
+ Settings.builder()
|
|
|
+ .put(HealthPeriodicLogger.POLL_INTERVAL_SETTING.getKey(), pollInterval)
|
|
|
+ // Since the default value of enabled is false, if we do not set it here it disable it
|
|
|
+ .put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), true)
|
|
|
+ .build()
|
|
|
);
|
|
|
+ assertTrue("health logger should be enabled", testHealthPeriodicLogger.enabled());
|
|
|
+ assertEquals(pollInterval, testHealthPeriodicLogger.getPollInterval());
|
|
|
assertNull(testHealthPeriodicLogger.getScheduler());
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- public void testTriggeredJobCallsTryToLogHealth() throws Exception {
|
|
|
- AtomicBoolean listenerCalled = new AtomicBoolean(false);
|
|
|
- AtomicBoolean failureCalled = new AtomicBoolean(false);
|
|
|
- ActionListener<List<HealthIndicatorResult>> testListener = new ActionListener<>() {
|
|
|
- @Override
|
|
|
- public void onResponse(List<HealthIndicatorResult> indicatorResults) {
|
|
|
- listenerCalled.set(true);
|
|
|
- }
|
|
|
+ testHealthPeriodicLogger.start();
|
|
|
+ // Start the service and check it's scheduled
|
|
|
+ {
|
|
|
+ TimeValue pollInterval = TimeValue.timeValueSeconds(randomIntBetween(15, 59));
|
|
|
+ this.clusterSettings.applySettings(
|
|
|
+ Settings.builder()
|
|
|
+ .put(HealthPeriodicLogger.POLL_INTERVAL_SETTING.getKey(), pollInterval)
|
|
|
+ // Since the default value of enabled is false, if we do not set it here it disable it
|
|
|
+ .put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), true)
|
|
|
+ .build()
|
|
|
+ );
|
|
|
+ assertEquals(pollInterval, testHealthPeriodicLogger.getPollInterval());
|
|
|
+ verifyLoggerIsReadyToRun(testHealthPeriodicLogger);
|
|
|
+ assertNotNull(testHealthPeriodicLogger.getScheduler());
|
|
|
+ assertTrue(
|
|
|
+ testHealthPeriodicLogger.getScheduler().scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME)
|
|
|
+ );
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- failureCalled.set(true);
|
|
|
- }
|
|
|
- };
|
|
|
- HealthService testHealthService = this.getMockedHealthService();
|
|
|
+ // Poll interval doesn't schedule a job when disabled
|
|
|
+ {
|
|
|
+ TimeValue pollInterval = TimeValue.timeValueSeconds(randomIntBetween(15, 59));
|
|
|
+ this.clusterSettings.applySettings(
|
|
|
+ Settings.builder()
|
|
|
+ .put(HealthPeriodicLogger.POLL_INTERVAL_SETTING.getKey(), pollInterval)
|
|
|
+ .put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), false)
|
|
|
+ .build()
|
|
|
+ );
|
|
|
+ assertFalse(testHealthPeriodicLogger.enabled());
|
|
|
+ assertEquals(pollInterval, testHealthPeriodicLogger.getPollInterval());
|
|
|
+ assertFalse(
|
|
|
+ testHealthPeriodicLogger.getScheduler().scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME)
|
|
|
+ );
|
|
|
+ // Re-enable
|
|
|
+ this.clusterSettings.applySettings(
|
|
|
+ Settings.builder().put(HealthPeriodicLogger.POLL_INTERVAL_SETTING.getKey(), pollInterval).build()
|
|
|
+ );
|
|
|
+ }
|
|
|
|
|
|
- testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, true);
|
|
|
+ testHealthPeriodicLogger.stop();
|
|
|
+ // verify that updating the polling interval doesn't schedule the job if it's stopped
|
|
|
+ {
|
|
|
+ this.clusterSettings.applySettings(
|
|
|
+ Settings.builder()
|
|
|
+ .put(HealthPeriodicLogger.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(30))
|
|
|
+ .put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), true)
|
|
|
+ .build()
|
|
|
+ );
|
|
|
+ assertTrue("health logger should be enabled", testHealthPeriodicLogger.enabled());
|
|
|
+ assertFalse(
|
|
|
+ testHealthPeriodicLogger.getScheduler().scheduledJobIds().contains(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
|
|
|
- spyHealthPeriodicLogger.isHealthNode = true;
|
|
|
+ public void testTriggeredJobCallsTryToLogHealth() throws Exception {
|
|
|
+ AtomicBoolean calledGetHealth = new AtomicBoolean();
|
|
|
+ HealthService testHealthService = this.getMockedHealthService();
|
|
|
doAnswer(invocation -> {
|
|
|
- testListener.onResponse(getTestIndicatorResults());
|
|
|
+ ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
|
|
|
+ assertNotNull(listener);
|
|
|
+ calledGetHealth.set(true);
|
|
|
+ listener.onResponse(getTestIndicatorResults());
|
|
|
return null;
|
|
|
- }).when(spyHealthPeriodicLogger).tryToLogHealth();
|
|
|
+ }).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
+ testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, true);
|
|
|
+ testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
|
|
|
- SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
+ verifyLoggerIsReadyToRun(testHealthPeriodicLogger);
|
|
|
|
|
|
- assertBusy(() -> assertFalse(failureCalled.get()));
|
|
|
- assertBusy(() -> assertTrue(listenerCalled.get()));
|
|
|
+ SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
|
|
|
+ testHealthPeriodicLogger.triggered(event);
|
|
|
+ assertBusy(() -> assertTrue(calledGetHealth.get()));
|
|
|
}
|
|
|
|
|
|
public void testResultFailureHandling() throws Exception {
|
|
@@ -265,9 +344,8 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
HealthService testHealthService = this.getMockedHealthService();
|
|
|
|
|
|
testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, true);
|
|
|
-
|
|
|
- HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
|
|
|
- spyHealthPeriodicLogger.isHealthNode = true;
|
|
|
+ testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
+ assertTrue("local node should be the health node", testHealthPeriodicLogger.isHealthNode());
|
|
|
|
|
|
SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
|
|
|
|
|
@@ -279,7 +357,7 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
getHealthCalled.incrementAndGet();
|
|
|
return null;
|
|
|
}).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
+ testHealthPeriodicLogger.triggered(event);
|
|
|
assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(1)));
|
|
|
}
|
|
|
|
|
@@ -291,7 +369,7 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
getHealthCalled.incrementAndGet();
|
|
|
return null;
|
|
|
}).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
+ testHealthPeriodicLogger.triggered(event);
|
|
|
assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(2)));
|
|
|
}
|
|
|
}
|
|
@@ -299,31 +377,41 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
public void testTryToLogHealthConcurrencyControlWithResults() throws Exception {
|
|
|
AtomicInteger getHealthCalled = new AtomicInteger(0);
|
|
|
|
|
|
+ CountDownLatch waitForSecondRun = new CountDownLatch(1);
|
|
|
+ CountDownLatch waitForRelease = new CountDownLatch(1);
|
|
|
HealthService testHealthService = this.getMockedHealthService();
|
|
|
doAnswer(invocation -> {
|
|
|
// get and call the results listener provided to getHealth
|
|
|
ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
|
|
|
- listener.onResponse(getTestIndicatorResults());
|
|
|
getHealthCalled.incrementAndGet();
|
|
|
+ waitForSecondRun.await();
|
|
|
+ listener.onResponse(getTestIndicatorResults());
|
|
|
+ waitForRelease.countDown();
|
|
|
return null;
|
|
|
}).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
|
|
|
testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, true);
|
|
|
-
|
|
|
- HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
|
|
|
- spyHealthPeriodicLogger.isHealthNode = true;
|
|
|
+ testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
+ verifyLoggerIsReadyToRun(testHealthPeriodicLogger);
|
|
|
|
|
|
SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
|
|
|
|
|
|
// run it once, verify getHealth is called
|
|
|
{
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
+ Thread logHealthThread = new Thread(() -> testHealthPeriodicLogger.triggered(event));
|
|
|
+ logHealthThread.start();
|
|
|
+ // We wait to verify that the triggered even is in progress, then we block, so it will rename in progress
|
|
|
assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(1)));
|
|
|
+ // We try to log again while it's in progress, we expect this run to be skipped
|
|
|
+ assertFalse(testHealthPeriodicLogger.tryToLogHealth());
|
|
|
+ // Unblock the first execution
|
|
|
+ waitForSecondRun.countDown();
|
|
|
}
|
|
|
|
|
|
// run it again, verify getHealth is called, because we are calling the results listener
|
|
|
{
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
+ waitForRelease.await();
|
|
|
+ testHealthPeriodicLogger.triggered(event);
|
|
|
assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(2)));
|
|
|
}
|
|
|
}
|
|
@@ -331,36 +419,51 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
public void testTryToLogHealthConcurrencyControl() throws Exception {
|
|
|
AtomicInteger getHealthCalled = new AtomicInteger(0);
|
|
|
|
|
|
+ CountDownLatch waitForSecondRun = new CountDownLatch(1);
|
|
|
+ CountDownLatch waitForRelease = new CountDownLatch(1);
|
|
|
+
|
|
|
HealthService testHealthService = this.getMockedHealthService();
|
|
|
doAnswer(invocation -> {
|
|
|
- // get but do not call the provided listener
|
|
|
+ // get but do not call the provided listener immediately
|
|
|
ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
|
|
|
assertNotNull(listener);
|
|
|
|
|
|
// note that we received the getHealth call
|
|
|
getHealthCalled.incrementAndGet();
|
|
|
+
|
|
|
+ // wait for the next run that should be skipped
|
|
|
+ waitForSecondRun.await();
|
|
|
+ // we can continue now
|
|
|
+ listener.onResponse(getTestIndicatorResults());
|
|
|
+ waitForRelease.countDown();
|
|
|
return null;
|
|
|
}).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
|
|
|
- testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, true);
|
|
|
-
|
|
|
- HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
|
|
|
- spyHealthPeriodicLogger.isHealthNode = true;
|
|
|
+ testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, false);
|
|
|
+ testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
+ assertTrue("local node should be the health node", testHealthPeriodicLogger.isHealthNode());
|
|
|
|
|
|
SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
|
|
|
|
|
|
- // call it once, and verify that getHealth is called
|
|
|
+ // call it and verify that getHealth is called
|
|
|
{
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
+ Thread logHealthThread = new Thread(() -> testHealthPeriodicLogger.triggered(event));
|
|
|
+ logHealthThread.start();
|
|
|
assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(1)));
|
|
|
}
|
|
|
|
|
|
- // trigger it again, and verify that getHealth is not called
|
|
|
- // it's not called because the results listener was never called by getHealth
|
|
|
- // this is simulating a double invocation of getHealth, due perhaps to a lengthy getHealth call
|
|
|
+ // run it again, verify that it's skipped because the other one is in progress
|
|
|
{
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
- assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(1)));
|
|
|
+ assertFalse(testHealthPeriodicLogger.tryToLogHealth());
|
|
|
+ // Unblock the first execution
|
|
|
+ waitForSecondRun.countDown();
|
|
|
+ }
|
|
|
+
|
|
|
+ // run it again, verify getHealth is called, because we are calling the results listener
|
|
|
+ {
|
|
|
+ waitForRelease.await();
|
|
|
+ testHealthPeriodicLogger.triggered(event);
|
|
|
+ assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(2)));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -369,10 +472,9 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
|
|
|
HealthService testHealthService = this.getMockedHealthService();
|
|
|
|
|
|
- testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, true);
|
|
|
-
|
|
|
- HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
|
|
|
- spyHealthPeriodicLogger.isHealthNode = true;
|
|
|
+ testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, false);
|
|
|
+ testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
+ assertTrue("local node should be the health node", testHealthPeriodicLogger.isHealthNode());
|
|
|
|
|
|
SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
|
|
|
|
|
@@ -380,7 +482,7 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
{
|
|
|
doThrow(new ResourceNotFoundException("No preflight indicators")).when(testHealthService)
|
|
|
.getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
+ testHealthPeriodicLogger.triggered(event);
|
|
|
assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(0)));
|
|
|
}
|
|
|
|
|
@@ -392,11 +494,102 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
getHealthCalled.incrementAndGet();
|
|
|
return null;
|
|
|
}).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
+ testHealthPeriodicLogger.triggered(event);
|
|
|
assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(1)));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testClosingWhenRunInProgress() throws Exception {
|
|
|
+ // Check that closing will still happen even if the run doesn't finish
|
|
|
+ {
|
|
|
+ AtomicInteger getHealthCalled = new AtomicInteger(0);
|
|
|
+
|
|
|
+ HealthService testHealthService = this.getMockedHealthService();
|
|
|
+ doAnswer(invocation -> {
|
|
|
+ ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
|
|
|
+ assertNotNull(listener);
|
|
|
+
|
|
|
+ // note that we received the getHealth call
|
|
|
+ getHealthCalled.incrementAndGet();
|
|
|
+ return null;
|
|
|
+ }).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
+
|
|
|
+ HealthPeriodicLogger healthLoggerThatWillNotFinish = createAndInitHealthPeriodicLogger(
|
|
|
+ this.clusterService,
|
|
|
+ testHealthService,
|
|
|
+ true
|
|
|
+ );
|
|
|
+ healthLoggerThatWillNotFinish.clusterChanged(
|
|
|
+ new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE)
|
|
|
+ );
|
|
|
+ assertTrue("local node should be the health node", healthLoggerThatWillNotFinish.isHealthNode());
|
|
|
+ assertTrue("health logger should be enabled", healthLoggerThatWillNotFinish.enabled());
|
|
|
+
|
|
|
+ SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
|
|
|
+
|
|
|
+ // call it and verify that it's in progress
|
|
|
+ {
|
|
|
+ healthLoggerThatWillNotFinish.triggered(event);
|
|
|
+ assertBusy(() -> assertThat(getHealthCalled.get(), equalTo(1)));
|
|
|
+ }
|
|
|
+ healthLoggerThatWillNotFinish.stop();
|
|
|
+ assertEquals(Lifecycle.State.STOPPED, healthLoggerThatWillNotFinish.lifecycleState());
|
|
|
+ // Close and wait out the timeout
|
|
|
+ healthLoggerThatWillNotFinish.close();
|
|
|
+ assertBusy(() -> assertEquals(Lifecycle.State.CLOSED, healthLoggerThatWillNotFinish.lifecycleState()), 5, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Ensure it will wait until it finishes before it closes
|
|
|
+ {
|
|
|
+ AtomicInteger getHealthCalled = new AtomicInteger(0);
|
|
|
+
|
|
|
+ CountDownLatch waitForCloseToBeTriggered = new CountDownLatch(1);
|
|
|
+ CountDownLatch waitForRelease = new CountDownLatch(1);
|
|
|
+
|
|
|
+ HealthService testHealthService = this.getMockedHealthService();
|
|
|
+ doAnswer(invocation -> {
|
|
|
+ // get but do not call the provided listener immediately
|
|
|
+ ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
|
|
|
+ assertNotNull(listener);
|
|
|
+
|
|
|
+ // note that we received the getHealth call
|
|
|
+ getHealthCalled.incrementAndGet();
|
|
|
+
|
|
|
+ // wait for the close signal
|
|
|
+ waitForCloseToBeTriggered.await();
|
|
|
+ // we can continue now
|
|
|
+ listener.onResponse(getTestIndicatorResults());
|
|
|
+ waitForRelease.countDown();
|
|
|
+ return null;
|
|
|
+ }).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
+
|
|
|
+ testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, true);
|
|
|
+ testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
+ verifyLoggerIsReadyToRun(testHealthPeriodicLogger);
|
|
|
+
|
|
|
+ SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
|
|
|
+
|
|
|
+ // call it and verify that getHealth is called
|
|
|
+ {
|
|
|
+ Thread logHealthThread = new Thread(() -> testHealthPeriodicLogger.triggered(event));
|
|
|
+ logHealthThread.start();
|
|
|
+ assertBusy(() -> assertTrue(testHealthPeriodicLogger.currentlyRunning()));
|
|
|
+ }
|
|
|
+
|
|
|
+ // stop and close it
|
|
|
+ {
|
|
|
+ testHealthPeriodicLogger.stop();
|
|
|
+ assertEquals(Lifecycle.State.STOPPED, testHealthPeriodicLogger.lifecycleState());
|
|
|
+ assertTrue(testHealthPeriodicLogger.currentlyRunning());
|
|
|
+ Thread closeHealthLogger = new Thread(() -> testHealthPeriodicLogger.close());
|
|
|
+ closeHealthLogger.start();
|
|
|
+ assertBusy(() -> assertTrue(testHealthPeriodicLogger.waitingToFinishCurrentRun()));
|
|
|
+ waitForCloseToBeTriggered.countDown();
|
|
|
+ assertBusy(() -> assertEquals(Lifecycle.State.CLOSED, testHealthPeriodicLogger.lifecycleState()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void testLoggingHappens() {
|
|
|
MockLogAppender mockAppender = new MockLogAppender();
|
|
|
mockAppender.start();
|
|
@@ -436,8 +629,13 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
Loggers.addAppender(periodicLoggerLogger, mockAppender);
|
|
|
|
|
|
HealthService testHealthService = this.getMockedHealthService();
|
|
|
-
|
|
|
- testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, true);
|
|
|
+ doAnswer(invocation -> {
|
|
|
+ ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
|
|
|
+ assertNotNull(listener);
|
|
|
+ listener.onResponse(getTestIndicatorResults());
|
|
|
+ return null;
|
|
|
+ }).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
+ testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, false);
|
|
|
|
|
|
// switch to Log only mode
|
|
|
this.clusterSettings.applySettings(
|
|
@@ -446,16 +644,11 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
.put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), true)
|
|
|
.build()
|
|
|
);
|
|
|
-
|
|
|
- HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
|
|
|
- spyHealthPeriodicLogger.isHealthNode = true;
|
|
|
- doAnswer(invocation -> {
|
|
|
- spyHealthPeriodicLogger.resultsListener.onResponse(getTestIndicatorResults());
|
|
|
- return null;
|
|
|
- }).when(spyHealthPeriodicLogger).tryToLogHealth();
|
|
|
+ testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
+ assertTrue("local node should be the health node", testHealthPeriodicLogger.isHealthNode());
|
|
|
|
|
|
SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
+ testHealthPeriodicLogger.triggered(event);
|
|
|
|
|
|
try {
|
|
|
mockAppender.assertAllExpectationsMatched();
|
|
@@ -496,8 +689,13 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
Loggers.addAppender(periodicLoggerLogger, mockAppender);
|
|
|
|
|
|
HealthService testHealthService = this.getMockedHealthService();
|
|
|
-
|
|
|
- testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, true);
|
|
|
+ doAnswer(invocation -> {
|
|
|
+ ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
|
|
|
+ assertNotNull(listener);
|
|
|
+ listener.onResponse(getTestIndicatorResults());
|
|
|
+ return null;
|
|
|
+ }).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
+ testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, false);
|
|
|
|
|
|
// switch to Metrics only mode
|
|
|
this.clusterSettings.applySettings(
|
|
@@ -506,16 +704,11 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
.put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), true)
|
|
|
.build()
|
|
|
);
|
|
|
-
|
|
|
- HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
|
|
|
- spyHealthPeriodicLogger.isHealthNode = true;
|
|
|
- doAnswer(invocation -> {
|
|
|
- spyHealthPeriodicLogger.resultsListener.onResponse(getTestIndicatorResults());
|
|
|
- return null;
|
|
|
- }).when(spyHealthPeriodicLogger).tryToLogHealth();
|
|
|
+ testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
+ assertTrue("local node should be the health node", testHealthPeriodicLogger.isHealthNode());
|
|
|
|
|
|
SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
+ testHealthPeriodicLogger.triggered(event);
|
|
|
|
|
|
try {
|
|
|
mockAppender.assertAllExpectationsMatched();
|
|
@@ -531,9 +724,21 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
|
|
|
BiConsumer<LongGaugeMetric, Long> metricWriter = (metric, value) -> metrics.add(value);
|
|
|
Consumer<ESLogMessage> logWriter = msg -> logs.add(msg.asString());
|
|
|
-
|
|
|
+ List<HealthIndicatorResult> results = getTestIndicatorResultsWithRed();
|
|
|
HealthService testHealthService = this.getMockedHealthService();
|
|
|
- testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(this.clusterService, testHealthService, true, metricWriter, logWriter);
|
|
|
+ doAnswer(invocation -> {
|
|
|
+ ActionListener<List<HealthIndicatorResult>> listener = invocation.getArgument(4);
|
|
|
+ assertNotNull(listener);
|
|
|
+ listener.onResponse(results);
|
|
|
+ return null;
|
|
|
+ }).when(testHealthService).getHealth(any(), isNull(), anyBoolean(), anyInt(), any());
|
|
|
+ testHealthPeriodicLogger = createAndInitHealthPeriodicLogger(
|
|
|
+ this.clusterService,
|
|
|
+ testHealthService,
|
|
|
+ false,
|
|
|
+ metricWriter,
|
|
|
+ logWriter
|
|
|
+ );
|
|
|
|
|
|
// switch to Metrics only mode
|
|
|
this.clusterSettings.applySettings(
|
|
@@ -542,25 +747,24 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
.put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), true)
|
|
|
.build()
|
|
|
);
|
|
|
-
|
|
|
- HealthPeriodicLogger spyHealthPeriodicLogger = spy(testHealthPeriodicLogger);
|
|
|
- spyHealthPeriodicLogger.isHealthNode = true;
|
|
|
- List<HealthIndicatorResult> results = getTestIndicatorResultsWithRed();
|
|
|
-
|
|
|
- doAnswer(invocation -> {
|
|
|
- spyHealthPeriodicLogger.resultsListener.onResponse(results);
|
|
|
- return null;
|
|
|
- }).when(spyHealthPeriodicLogger).tryToLogHealth();
|
|
|
+ testHealthPeriodicLogger.clusterChanged(new ClusterChangedEvent("test", stateWithLocalHealthNode, ClusterState.EMPTY_STATE));
|
|
|
+ assertTrue("local node should be the health node", testHealthPeriodicLogger.isHealthNode());
|
|
|
|
|
|
assertEquals(0, metrics.size());
|
|
|
|
|
|
SchedulerEngine.Event event = new SchedulerEngine.Event(HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_JOB_NAME, 0, 0);
|
|
|
- spyHealthPeriodicLogger.triggered(event);
|
|
|
+ testHealthPeriodicLogger.triggered(event);
|
|
|
|
|
|
assertEquals(0, logs.size());
|
|
|
assertEquals(4, metrics.size());
|
|
|
}
|
|
|
|
|
|
+ private void verifyLoggerIsReadyToRun(HealthPeriodicLogger healthPeriodicLogger) {
|
|
|
+ assertTrue("local node should be the health node", healthPeriodicLogger.isHealthNode());
|
|
|
+ assertTrue("health logger should be enabled", healthPeriodicLogger.enabled());
|
|
|
+ assertEquals("health logger is started", Lifecycle.State.STARTED, healthPeriodicLogger.lifecycleState());
|
|
|
+ }
|
|
|
+
|
|
|
private List<HealthIndicatorResult> getTestIndicatorResults() {
|
|
|
var networkLatency = new HealthIndicatorResult("master_is_stable", GREEN, null, null, null, null);
|
|
|
var slowTasks = new HealthIndicatorResult("disk", YELLOW, null, null, null, null);
|
|
@@ -592,15 +796,15 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
private HealthPeriodicLogger createAndInitHealthPeriodicLogger(
|
|
|
ClusterService clusterService,
|
|
|
HealthService testHealthService,
|
|
|
- boolean enabled
|
|
|
+ boolean started
|
|
|
) {
|
|
|
- return createAndInitHealthPeriodicLogger(clusterService, testHealthService, enabled, null, null);
|
|
|
+ return createAndInitHealthPeriodicLogger(clusterService, testHealthService, started, null, null);
|
|
|
}
|
|
|
|
|
|
private HealthPeriodicLogger createAndInitHealthPeriodicLogger(
|
|
|
ClusterService clusterService,
|
|
|
HealthService testHealthService,
|
|
|
- boolean enabled,
|
|
|
+ boolean started,
|
|
|
BiConsumer<LongGaugeMetric, Long> metricWriter,
|
|
|
Consumer<ESLogMessage> logWriter
|
|
|
) {
|
|
@@ -626,9 +830,13 @@ public class HealthPeriodicLoggerTests extends ESTestCase {
|
|
|
provider
|
|
|
);
|
|
|
}
|
|
|
- if (enabled) {
|
|
|
- clusterSettings.applySettings(Settings.builder().put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), true).build());
|
|
|
+ if (started) {
|
|
|
+ testHealthPeriodicLogger.start();
|
|
|
}
|
|
|
+ // Reset cluster setting
|
|
|
+ clusterSettings.applySettings(Settings.EMPTY);
|
|
|
+ // enable
|
|
|
+ clusterSettings.applySettings(Settings.builder().put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), true).build());
|
|
|
|
|
|
return testHealthPeriodicLogger;
|
|
|
}
|