|
|
@@ -84,7 +84,7 @@ public class MockNioTransport extends TcpTransport {
|
|
|
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
|
|
|
CircuitBreakerService circuitBreakerService) {
|
|
|
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
|
|
|
- this.transportThreadWatchdog = new TransportThreadWatchdog(threadPool);
|
|
|
+ this.transportThreadWatchdog = new TransportThreadWatchdog(threadPool, settings);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -322,21 +322,20 @@ public class MockNioTransport extends TcpTransport {
|
|
|
}
|
|
|
|
|
|
static final class TransportThreadWatchdog {
|
|
|
-
|
|
|
- private static final long WARN_THRESHOLD = TimeUnit.MILLISECONDS.toNanos(150);
|
|
|
-
|
|
|
// Only check every 2s to not flood the logs on a blocked thread.
|
|
|
// We mostly care about long blocks and not random slowness anyway and in tests would randomly catch slow operations that block for
|
|
|
// less than 2s eventually.
|
|
|
private static final TimeValue CHECK_INTERVAL = TimeValue.timeValueSeconds(2);
|
|
|
|
|
|
+ private final long warnThreshold;
|
|
|
private final ThreadPool threadPool;
|
|
|
private final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();
|
|
|
|
|
|
private volatile boolean stopped;
|
|
|
|
|
|
- TransportThreadWatchdog(ThreadPool threadPool) {
|
|
|
+ TransportThreadWatchdog(ThreadPool threadPool, Settings settings) {
|
|
|
this.threadPool = threadPool;
|
|
|
+ warnThreshold = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(settings).nanos() + TimeValue.timeValueMillis(100L).nanos();
|
|
|
threadPool.schedule(this::logLongRunningExecutions, CHECK_INTERVAL, ThreadPool.Names.GENERIC);
|
|
|
}
|
|
|
|
|
|
@@ -353,7 +352,7 @@ public class MockNioTransport extends TcpTransport {
|
|
|
|
|
|
private void maybeLogElapsedTime(long startTime) {
|
|
|
long elapsedTime = threadPool.relativeTimeInNanos() - startTime;
|
|
|
- if (elapsedTime > WARN_THRESHOLD) {
|
|
|
+ if (elapsedTime > warnThreshold) {
|
|
|
logger.warn(
|
|
|
new ParameterizedMessage("Slow execution on network thread [{} milliseconds]",
|
|
|
TimeUnit.NANOSECONDS.toMillis(elapsedTime)),
|
|
|
@@ -364,7 +363,7 @@ public class MockNioTransport extends TcpTransport {
|
|
|
private void logLongRunningExecutions() {
|
|
|
for (Map.Entry<Thread, Long> entry : registry.entrySet()) {
|
|
|
final long elapsedTimeInNanos = threadPool.relativeTimeInNanos() - entry.getValue();
|
|
|
- if (elapsedTimeInNanos > WARN_THRESHOLD) {
|
|
|
+ if (elapsedTimeInNanos > warnThreshold) {
|
|
|
final Thread thread = entry.getKey();
|
|
|
logger.warn("Potentially blocked execution on network thread [{}] [{} milliseconds]: \n{}", thread.getName(),
|
|
|
TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos),
|