|
@@ -15,6 +15,8 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
|
|
import org.elasticsearch.common.settings.ClusterSettings;
|
|
|
import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
|
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|
|
import org.elasticsearch.common.util.set.Sets;
|
|
|
import org.elasticsearch.core.IOUtils;
|
|
|
import org.elasticsearch.core.Nullable;
|
|
@@ -43,6 +45,11 @@ import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
|
|
|
public class FsHealthService extends AbstractLifecycleComponent implements NodeHealthService {
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(FsHealthService.class);
|
|
|
+
|
|
|
+ private static final StatusInfo HEALTHY_DISABLED = new StatusInfo(HEALTHY, "health check disabled");
|
|
|
+ private static final StatusInfo UNHEALTHY_BROKEN_NODE_LOCK = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock");
|
|
|
+ private static final StatusInfo HEALTHY_SUCCESS = new StatusInfo(HEALTHY, "health check passed");
|
|
|
+
|
|
|
private final ThreadPool threadPool;
|
|
|
private volatile boolean enabled;
|
|
|
private volatile boolean brokenLock;
|
|
@@ -50,7 +57,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH
|
|
|
private volatile TimeValue slowPathLoggingThreshold;
|
|
|
private final NodeEnvironment nodeEnv;
|
|
|
private final LongSupplier currentTimeMillisSupplier;
|
|
|
- private volatile Scheduler.Cancellable scheduledFuture;
|
|
|
+ private Scheduler.Cancellable scheduledFuture; // accesses all synchronized on AbstractLifecycleComponent#lifecycle
|
|
|
|
|
|
@Nullable
|
|
|
private volatile Set<Path> unhealthyPaths;
|
|
@@ -109,42 +116,53 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH
|
|
|
|
|
|
@Override
|
|
|
public StatusInfo getHealth() {
|
|
|
- StatusInfo statusInfo;
|
|
|
- Set<Path> unhealthyPaths = this.unhealthyPaths;
|
|
|
if (enabled == false) {
|
|
|
- statusInfo = new StatusInfo(HEALTHY, "health check disabled");
|
|
|
- } else if (brokenLock) {
|
|
|
- statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock");
|
|
|
- } else if (unhealthyPaths == null) {
|
|
|
- statusInfo = new StatusInfo(HEALTHY, "health check passed");
|
|
|
- } else {
|
|
|
- String info = "health check failed on ["
|
|
|
- + unhealthyPaths.stream().map(k -> k.toString()).collect(Collectors.joining(","))
|
|
|
- + "]";
|
|
|
- statusInfo = new StatusInfo(UNHEALTHY, info);
|
|
|
+ return HEALTHY_DISABLED;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (brokenLock) {
|
|
|
+ return UNHEALTHY_BROKEN_NODE_LOCK;
|
|
|
}
|
|
|
|
|
|
- return statusInfo;
|
|
|
+ var unhealthyPaths = this.unhealthyPaths; // single volatile read
|
|
|
+ if (unhealthyPaths != null) {
|
|
|
+ assert unhealthyPaths.isEmpty() == false;
|
|
|
+ return new StatusInfo(
|
|
|
+ UNHEALTHY,
|
|
|
+ "health check failed on [" + unhealthyPaths.stream().map(Path::toString).collect(Collectors.joining(",")) + "]"
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ return HEALTHY_SUCCESS;
|
|
|
}
|
|
|
|
|
|
- class FsHealthMonitor implements Runnable {
|
|
|
+ class FsHealthMonitor extends AbstractRunnable {
|
|
|
|
|
|
+ // Exposed for testing
|
|
|
static final String TEMP_FILE_NAME = ".es_temp_file";
|
|
|
- private byte[] bytesToWrite;
|
|
|
|
|
|
- FsHealthMonitor() {
|
|
|
- this.bytesToWrite = UUIDs.randomBase64UUID().getBytes(StandardCharsets.UTF_8);
|
|
|
+ private final byte[] bytesToWrite = UUIDs.randomBase64UUID().getBytes(StandardCharsets.UTF_8);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ logger.error("health check failed", e);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- if (enabled) {
|
|
|
- monitorFSHealth();
|
|
|
- logger.debug("health check succeeded");
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error("health check failed", e);
|
|
|
+ public void onRejection(Exception e) {
|
|
|
+ if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) {
|
|
|
+ logger.debug("health check skipped (executor shut down)", e);
|
|
|
+ } else {
|
|
|
+ onFailure(e);
|
|
|
+ assert false : e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void doRun() {
|
|
|
+ if (enabled) {
|
|
|
+ monitorFSHealth();
|
|
|
+ logger.debug("health check completed");
|
|
|
}
|
|
|
}
|
|
|
|