소스 검색

Detect long-running tasks on network threads (#109204)

This commit introduces a watchdog timer to monitor for long-running
tasks on network threads. If a network thread is active and has not made
progress for two consecutive ticks of the timer then the watchdog logs a
warning and a thread dump.
David Turner 1 년 전
부모
커밋
683245e41e

+ 5 - 0
docs/changelog/109204.yaml

@@ -0,0 +1,5 @@
+pr: 109204
+summary: Detect long-running tasks on network threads
+area: Network
+type: enhancement
+issues: []

+ 46 - 7
docs/reference/modules/network/threading.asciidoc

@@ -109,10 +109,49 @@ the `transport_worker` threads are too busy. It is more reliable to use
 profiling trace. These tools are independent of any work the JVM is performing.
 
 It may also be possible to identify some reasons for delays from the server
-logs, particularly looking at warnings from
-`org.elasticsearch.transport.InboundHandler` and
-`org.elasticsearch.transport.OutboundHandler`. Warnings about long processing
-times from the `InboundHandler` are particularly indicative of incorrect
-threading behaviour, whereas the transmission time reported by the
-`OutboundHandler` includes time spent waiting for network congestion and the
-`transport_worker` thread is free to do other work during this time.
+logs. See for instance the following loggers:
+
+`org.elasticsearch.transport.InboundHandler`:: This logger reports a warning if
+processing an inbound message occupies a network thread for unreasonably long,
+which is almost certainly a bug. The warning includes some information which
+can be used to identify the message that took unreasonably long to process.
+
+`org.elasticsearch.transport.OutboundHandler`:: This logger reports a warning
+if sending an outbound message takes longer than expected. This duration
+includes time spent waiting for network congestion to clear, and time spent
+processing other work on the same network thread, so does not always indicate
+the presence of a bug related to the outbound message specified in the log
+entry.
+
+`org.elasticsearch.common.network.ThreadWatchdog`:: This logger reports a
+warning and a thread dump when it notices that a network thread has not made
+progress between two consecutive checks, which is almost certainly a bug:
++
+--
+[source,text]
+----
+[WARN ][o.e.c.n.ThreadWatchdog   ] the following threads are active but did not make progress in the preceding [5s]: [elasticsearch[instance-0000000004][transport_worker][T#1]]]
+[WARN ][o.e.c.n.ThreadWatchdog   ] hot threads dump due to active threads not making progress [part 1]: H4sIAAAAAAAA/+1aa2/bOBb93l8hYLUYFWgYvWw5AQbYpEkn6STZbJyiwAwGA1qiY8US6ZJUHvPr90qk/JJky41TtDMuUIci...
+[WARN ][o.e.c.n.ThreadWatchdog   ] hot threads dump due to active threads not making progress [part 2]: LfXL/x70a3eL8ve6Ral74ZBrp5x7HmUD9KXQz1MaXUNfFC6SeEysxSw1cNXL9JXYl3AigAE7ywbm/AZ+ll3Ox4qXJHNjVr6h...
+[WARN ][o.e.c.n.ThreadWatchdog   ] hot threads dump due to active threads not making progress (gzip compressed, base64-encoded, and split into 2 parts on preceding log lines; ...
+----
+
+To reconstruct the thread dump, base64-decode the data and decompress it using `gzip`. For instance, on Unix-like systems:
+
+[source,sh]
+----
+cat watchdog.log | sed -e 's/.*://' | base64 --decode | gzip --decompress
+----
+
+This mechanism can be controlled with the following settings:
+
+`network.thread.watchdog.interval`:::
+(<<static-cluster-setting,Static>>, <<time-units,time value>>)
+Defines the interval between watchdog checks. Defaults to `5s`. Set to `0` to
+disable the network thread watchdog.
+
+`network.thread.watchdog.quiet_time`:::
+(<<static-cluster-setting,Static>>, <<time-units,time value>>)
+Defines the interval between watchdog warnings. Defaults to `10m`.
+
+--

+ 27 - 15
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

@@ -31,6 +31,7 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.network.ThreadWatchdog;
 import org.elasticsearch.core.Booleans;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
@@ -56,6 +57,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
     private static final Logger logger = LogManager.getLogger(Netty4HttpPipeliningHandler.class);
 
     private final int maxEventsHeld;
+    private final ThreadWatchdog.ActivityTracker activityTracker;
     private final PriorityQueue<Tuple<? extends Netty4HttpResponse, ChannelPromise>> outboundHoldingQueue;
 
     private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBodyPart responseBodyPart) {}
@@ -90,31 +92,41 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
      * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is
      *                      required as events cannot queue up indefinitely
      */
-    public Netty4HttpPipeliningHandler(final int maxEventsHeld, final Netty4HttpServerTransport serverTransport) {
+    public Netty4HttpPipeliningHandler(
+        final int maxEventsHeld,
+        final Netty4HttpServerTransport serverTransport,
+        final ThreadWatchdog.ActivityTracker activityTracker
+    ) {
         this.maxEventsHeld = maxEventsHeld;
+        this.activityTracker = activityTracker;
         this.outboundHoldingQueue = new PriorityQueue<>(1, Comparator.comparingInt(t -> t.v1().getSequence()));
         this.serverTransport = serverTransport;
     }
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
-        assert msg instanceof FullHttpRequest : "Should have fully aggregated message already but saw [" + msg + "]";
-        final FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
-        final Netty4HttpRequest netty4HttpRequest;
-        if (fullHttpRequest.decoderResult().isFailure()) {
-            final Throwable cause = fullHttpRequest.decoderResult().cause();
-            final Exception nonError;
-            if (cause instanceof Error) {
-                ExceptionsHelper.maybeDieOnAnotherThread(cause);
-                nonError = new Exception(cause);
+        activityTracker.startActivity();
+        try {
+            assert msg instanceof FullHttpRequest : "Should have fully aggregated message already but saw [" + msg + "]";
+            final FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
+            final Netty4HttpRequest netty4HttpRequest;
+            if (fullHttpRequest.decoderResult().isFailure()) {
+                final Throwable cause = fullHttpRequest.decoderResult().cause();
+                final Exception nonError;
+                if (cause instanceof Error) {
+                    ExceptionsHelper.maybeDieOnAnotherThread(cause);
+                    nonError = new Exception(cause);
+                } else {
+                    nonError = (Exception) cause;
+                }
+                netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest, nonError);
             } else {
-                nonError = (Exception) cause;
+                netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
             }
-            netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest, nonError);
-        } else {
-            netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
+            handlePipelinedRequest(ctx, netty4HttpRequest);
+        } finally {
+            activityTracker.stopActivity();
         }
-        handlePipelinedRequest(ctx, netty4HttpRequest);
     }
 
     // protected so tests can override it

+ 12 - 1
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

@@ -38,6 +38,7 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.common.network.CloseableChannel;
 import org.elasticsearch.common.network.NetworkService;
+import org.elasticsearch.common.network.ThreadWatchdog;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -94,6 +95,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
     private final TLSConfig tlsConfig;
     private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate;
     private final HttpValidator httpValidator;
+    private final ThreadWatchdog threadWatchdog;
     private final int readTimeoutMillis;
 
     private final int maxCompositeBufferComponents;
@@ -130,6 +132,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
         this.tlsConfig = tlsConfig;
         this.acceptChannelPredicate = acceptChannelPredicate;
         this.httpValidator = httpValidator;
+        this.threadWatchdog = networkService.getThreadWatchdog();
 
         this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
 
@@ -381,7 +384,15 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
             if (handlingSettings.compression()) {
                 ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.compressionLevel()));
             }
-            ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(transport.pipeliningMaxEvents, transport));
+            ch.pipeline()
+                .addLast(
+                    "pipelining",
+                    new Netty4HttpPipeliningHandler(
+                        transport.pipeliningMaxEvents,
+                        transport,
+                        transport.threadWatchdog.getActivityTrackerForCurrentThread()
+                    )
+                );
             transport.serverAcceptedChannel(nettyHttpChannel);
         }
 

+ 12 - 1
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageInboundHandler.java

@@ -15,6 +15,7 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.network.ThreadWatchdog;
 import org.elasticsearch.core.RefCounted;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.transport.InboundPipeline;
@@ -30,9 +31,16 @@ public class Netty4MessageInboundHandler extends ChannelInboundHandlerAdapter {
 
     private final InboundPipeline pipeline;
 
-    public Netty4MessageInboundHandler(Netty4Transport transport, InboundPipeline inboundPipeline) {
+    private final ThreadWatchdog.ActivityTracker activityTracker;
+
+    public Netty4MessageInboundHandler(
+        Netty4Transport transport,
+        InboundPipeline inboundPipeline,
+        ThreadWatchdog.ActivityTracker activityTracker
+    ) {
         this.transport = transport;
         this.pipeline = inboundPipeline;
+        this.activityTracker = activityTracker;
     }
 
     @Override
@@ -44,8 +52,11 @@ public class Netty4MessageInboundHandler extends ChannelInboundHandlerAdapter {
         final ByteBuf buffer = (ByteBuf) msg;
         Netty4TcpChannel channel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
         final BytesReference wrapped = Netty4Utils.toBytesReference(buffer);
+        activityTracker.startActivity();
         try (ReleasableBytesReference reference = new ReleasableBytesReference(wrapped, new ByteBufRefCounted(buffer))) {
             pipeline.handleBytes(channel, reference);
+        } finally {
+            activityTracker.stopActivity();
         }
     }
 

+ 13 - 1
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

@@ -30,6 +30,7 @@ import org.elasticsearch.TransportVersion;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.network.NetworkService;
+import org.elasticsearch.common.network.ThreadWatchdog;
 import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -78,6 +79,8 @@ public class Netty4Transport extends TcpTransport {
     private volatile SharedGroupFactory.SharedGroup sharedGroup;
     protected final boolean remoteClusterPortEnabled;
 
+    private final ThreadWatchdog threadWatchdog;
+
     public Netty4Transport(
         Settings settings,
         TransportVersion version,
@@ -92,6 +95,7 @@ public class Netty4Transport extends TcpTransport {
         Netty4Utils.setAvailableProcessors(EsExecutors.allocatedProcessors(settings));
         NettyAllocator.logAllocatorDescriptionIfNeeded();
         this.sharedGroupFactory = sharedGroupFactory;
+        this.threadWatchdog = networkService.getThreadWatchdog();
 
         // See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one
         this.receivePredictorMin = Netty4Plugin.NETTY_RECEIVE_PREDICTOR_MIN.get(settings);
@@ -125,6 +129,7 @@ public class Netty4Transport extends TcpTransport {
                     bindServer(profileSettings);
                 }
             }
+            threadWatchdog.run(settings, threadPool, lifecycle);
             success = true;
         } finally {
             if (success == false) {
@@ -354,7 +359,14 @@ public class Netty4Transport extends TcpTransport {
             pipeline.addLast("logging", ESLoggingHandler.INSTANCE);
         }
         pipeline.addLast("chunked_writer", new Netty4WriteThrottlingHandler(getThreadPool().getThreadContext()));
-        pipeline.addLast("dispatcher", new Netty4MessageInboundHandler(this, getInboundPipeline(ch, isRemoteClusterServerChannel)));
+        pipeline.addLast(
+            "dispatcher",
+            new Netty4MessageInboundHandler(
+                this,
+                getInboundPipeline(ch, isRemoteClusterServerChannel),
+                threadWatchdog.getActivityTrackerForCurrentThread()
+            )
+        );
     }
 
     protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteClusterServerChannel) {

+ 38 - 3
modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java

@@ -34,6 +34,8 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.bytes.ZeroBytesReference;
+import org.elasticsearch.common.network.ThreadWatchdog;
+import org.elasticsearch.common.network.ThreadWatchdogHelper;
 import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.http.HttpResponse;
 import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
@@ -53,11 +55,14 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
@@ -120,7 +125,7 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
     }
 
     private EmbeddedChannel makeEmbeddedChannelWithSimulatedWork(int numberOfRequests) {
-        return new EmbeddedChannel(new Netty4HttpPipeliningHandler(numberOfRequests, null) {
+        return new EmbeddedChannel(new Netty4HttpPipeliningHandler(numberOfRequests, null, new ThreadWatchdog.ActivityTracker()) {
             @Override
             protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) {
                 ctx.fireChannelRead(pipelinedRequest);
@@ -186,7 +191,9 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
 
     public void testPipeliningRequestsAreReleased() {
         final int numberOfRequests = 10;
-        final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4HttpPipeliningHandler(numberOfRequests + 1, null));
+        final EmbeddedChannel embeddedChannel = new EmbeddedChannel(
+            new Netty4HttpPipeliningHandler(numberOfRequests + 1, null, new ThreadWatchdog.ActivityTracker())
+        );
 
         for (int i = 0; i < numberOfRequests; i++) {
             embeddedChannel.writeInbound(createHttpRequest("/" + i));
@@ -473,6 +480,30 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
         assertThat(messagesSeen.get(1), instanceOf(DefaultHttpContent.class));
     }
 
+    public void testActivityTracking() {
+        final var watchdog = new ThreadWatchdog();
+        final var activityTracker = watchdog.getActivityTrackerForCurrentThread();
+        final var requestHandled = new AtomicBoolean();
+        final var handler = new Netty4HttpPipeliningHandler(Integer.MAX_VALUE, mock(Netty4HttpServerTransport.class), activityTracker) {
+            @Override
+            protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) {
+                // thread is not idle while handling the request
+                assertThat(ThreadWatchdogHelper.getStuckThreadNames(watchdog), empty());
+                assertThat(ThreadWatchdogHelper.getStuckThreadNames(watchdog), equalTo(List.of(Thread.currentThread().getName())));
+                ctx.fireChannelRead(pipelinedRequest);
+                assertTrue(requestHandled.compareAndSet(false, true));
+            }
+        };
+
+        final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelDuplexHandler(), handler);
+        embeddedChannel.writeInbound(createHttpRequest("/test"));
+        assertTrue(requestHandled.get());
+
+        // thread is now idle
+        assertThat(ThreadWatchdogHelper.getStuckThreadNames(watchdog), empty());
+        assertThat(ThreadWatchdogHelper.getStuckThreadNames(watchdog), empty());
+    }
+
     // assert that a message of the given number of repeated chunks is found at the given index in the list and each chunk is equal to
     // the given BytesReference
     private static void assertChunkedMessageAtIndex(List<Object> messagesSeen, int index, int chunks, BytesReference chunkBytes) {
@@ -494,7 +525,11 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
     }
 
     private Netty4HttpPipeliningHandler getTestHttpHandler() {
-        return new Netty4HttpPipeliningHandler(Integer.MAX_VALUE, mock(Netty4HttpServerTransport.class)) {
+        return new Netty4HttpPipeliningHandler(
+            Integer.MAX_VALUE,
+            mock(Netty4HttpServerTransport.class),
+            new ThreadWatchdog.ActivityTracker()
+        ) {
             @Override
             protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) {
                 ctx.fireChannelRead(pipelinedRequest);

+ 163 - 0
server/src/internalClusterTest/java/org/elasticsearch/common/network/ThreadWatchdogIT.java

@@ -0,0 +1,163 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.network;
+
+import org.apache.logging.log4j.core.LogEvent;
+import org.elasticsearch.action.ActionListenerResponseHandler;
+import org.elasticsearch.action.support.SubscribableListener;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.IndexScopedSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.RunOnce;
+import org.elasticsearch.features.NodeFeature;
+import org.elasticsearch.plugins.ActionPlugin;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.rest.RestChannel;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.MockLog;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.transport.TransportResponse;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+public class ThreadWatchdogIT extends ESIntegTestCase {
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal, otherSettings))
+            .put(ThreadWatchdog.NETWORK_THREAD_WATCHDOG_INTERVAL.getKey(), "100ms")
+            .put(ThreadWatchdog.NETWORK_THREAD_WATCHDOG_QUIET_TIME.getKey(), "0")
+            .build();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return CollectionUtils.appendToCopyNoNullElements(
+            super.nodePlugins(),
+            SlowRequestProcessingPlugin.class,
+            MockTransportService.TestPlugin.class
+        );
+    }
+
+    @Override
+    protected boolean addMockHttpTransport() {
+        return false;
+    }
+
+    public static class SlowRequestProcessingPlugin extends Plugin implements ActionPlugin {
+
+        @Override
+        public Collection<RestHandler> getRestHandlers(
+            Settings settings,
+            NamedWriteableRegistry namedWriteableRegistry,
+            RestController restController,
+            ClusterSettings clusterSettings,
+            IndexScopedSettings indexScopedSettings,
+            SettingsFilter settingsFilter,
+            IndexNameExpressionResolver indexNameExpressionResolver,
+            Supplier<DiscoveryNodes> nodesInCluster,
+            Predicate<NodeFeature> clusterSupportsFeature
+        ) {
+            return List.of(new RestHandler() {
+                @Override
+                public List<Route> routes() {
+                    return List.of(Route.builder(RestRequest.Method.POST, "_slow").build());
+                }
+
+                @Override
+                public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) {
+                    blockAndWaitForWatchdogLogs();
+                    new RestToXContentListener<>(channel).onResponse((b, p) -> b.startObject().endObject());
+                }
+            });
+        }
+    }
+
+    private static void blockAndWaitForWatchdogLogs() {
+        final var threadName = Thread.currentThread().getName();
+        final var logsSeenLatch = new CountDownLatch(2);
+        final var warningSeen = new RunOnce(logsSeenLatch::countDown);
+        final var threadDumpSeen = new RunOnce(logsSeenLatch::countDown);
+        MockLog.assertThatLogger(() -> safeAwait(logsSeenLatch), ThreadWatchdog.class, new MockLog.LoggingExpectation() {
+            @Override
+            public void match(LogEvent event) {
+                final var formattedMessage = event.getMessage().getFormattedMessage();
+                if (formattedMessage.contains("the following threads are active but did not make progress in the preceding [100ms]:")
+                    && formattedMessage.contains(threadName)) {
+                    warningSeen.run();
+                }
+                if (formattedMessage.contains("hot threads dump due to active threads not making progress")) {
+                    threadDumpSeen.run();
+                }
+            }
+
+            @Override
+            public void assertMatched() {}
+        });
+    }
+
+    public void testThreadWatchdogHttpLogging() throws IOException {
+        ESRestTestCase.assertOK(getRestClient().performRequest(new Request("POST", "_slow")));
+    }
+
+    public void testThreadWatchdogTransportLogging() {
+        internalCluster().ensureAtLeastNumDataNodes(2);
+        final var transportServiceIterator = internalCluster().getInstances(TransportService.class).iterator();
+        final var sourceTransportService = transportServiceIterator.next();
+        final var targetTransportService = transportServiceIterator.next();
+
+        targetTransportService.registerRequestHandler(
+            "internal:slow",
+            EsExecutors.DIRECT_EXECUTOR_SERVICE,
+            TransportRequest.Empty::new,
+            (request, channel, task) -> {
+                blockAndWaitForWatchdogLogs();
+                channel.sendResponse(TransportResponse.Empty.INSTANCE);
+            }
+        );
+
+        safeAwait(
+            SubscribableListener.newForked(
+                l -> sourceTransportService.sendRequest(
+                    targetTransportService.getLocalNode(),
+                    "internal:slow",
+                    new TransportRequest.Empty(),
+                    new ActionListenerResponseHandler<TransportResponse>(
+                        l,
+                        in -> TransportResponse.Empty.INSTANCE,
+                        EsExecutors.DIRECT_EXECUTOR_SERVICE
+                    )
+                )
+            )
+        );
+    }
+
+}

+ 5 - 0
server/src/main/java/org/elasticsearch/common/network/NetworkService.java

@@ -85,6 +85,7 @@ public final class NetworkService {
 
     private final List<CustomNameResolver> customNameResolvers;
     private final HandlingTimeTracker handlingTimeTracker = new HandlingTimeTracker();
+    private final ThreadWatchdog threadWatchdog = new ThreadWatchdog();
 
     public NetworkService(List<CustomNameResolver> customNameResolvers) {
         this.customNameResolvers = Objects.requireNonNull(customNameResolvers, "customNameResolvers must be non null");
@@ -94,6 +95,10 @@ public final class NetworkService {
         return handlingTimeTracker;
     }
 
+    public ThreadWatchdog getThreadWatchdog() {
+        return threadWatchdog;
+    }
+
     /**
      * Resolves {@code bindHosts} to a list of internet addresses. The list will
      * not contain duplicate addresses.

+ 280 - 0
server/src/main/java/org/elasticsearch/common/network/ThreadWatchdog.java

@@ -0,0 +1,280 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.network;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.common.ReferenceDocs;
+import org.elasticsearch.common.component.Lifecycle;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.monitor.jvm.HotThreads;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Watchdog mechanism for making sure that no transport thread spends too long blocking the event loop.
+ */
+// Today we only use this to track activity processing reads on network threads. Tracking time when we're busy processing writes is a little
+// trickier because that code is more re-entrant, both within the network layer and also it may complete a listener from the wider codebase
+// that ends up calling back into the network layer again. But also we don't see many network threads blocking for ages on the write path,
+// so we focus on reads for now.
+public class ThreadWatchdog {
+
+    public static final Setting<TimeValue> NETWORK_THREAD_WATCHDOG_INTERVAL = Setting.timeSetting(
+        "network.thread.watchdog.interval",
+        TimeValue.timeValueSeconds(5),
+        Setting.Property.NodeScope
+    );
+
+    public static final Setting<TimeValue> NETWORK_THREAD_WATCHDOG_QUIET_TIME = Setting.timeSetting(
+        "network.thread.watchdog.quiet_time",
+        TimeValue.timeValueMinutes(10),
+        Setting.Property.NodeScope
+    );
+
+    private static final Logger logger = LogManager.getLogger(ThreadWatchdog.class);
+
+    /**
+     * Activity tracker for the current thread. Thread-locals are only retained by the owning thread so these will be GCd after thread exit.
+     */
+    private final ThreadLocal<ActivityTracker> activityTrackerThreadLocal = new ThreadLocal<>();
+
+    /**
+     * Collection of known activity trackers to be scanned for stuck threads. Uses {@link WeakReference} so that we don't prevent trackers
+     * from being GCd if a thread exits. There aren't many such trackers, O(#cpus), and they almost never change, so an {@link ArrayList}
+     * with explicit synchronization is fine.
+     */
+    private final List<WeakReference<ActivityTracker>> knownTrackers = new ArrayList<>();
+
+    /**
+     * @return an activity tracker for activities on the current thread.
+     */
+    public ActivityTracker getActivityTrackerForCurrentThread() {
+        var result = activityTrackerThreadLocal.get();
+        if (result == null) {
+            // this is a previously-untracked thread; thread creation is assumed to be very rare, no need to optimize this path at all
+            result = new ActivityTracker();
+            synchronized (knownTrackers) {
+                knownTrackers.add(new WeakReference<>(result));
+            }
+            activityTrackerThreadLocal.set(result);
+        }
+        return result;
+    }
+
+    // exposed for testing
+    List<String> getStuckThreadNames() {
+        List<String> stuckThreadNames = null;
+        // this is not called very often, and only on a single thread, with almost no contention on this mutex since thread creation is rare
+        synchronized (knownTrackers) {
+            final var iterator = knownTrackers.iterator();
+            while (iterator.hasNext()) {
+                final var tracker = iterator.next().get();
+                if (tracker == null) {
+                    // tracker was GCd because its thread exited - very rare, no need to optimize this case
+                    iterator.remove();
+                } else if (tracker.isIdleOrMakingProgress() == false) {
+                    if (stuckThreadNames == null) {
+                        stuckThreadNames = new ArrayList<>();
+                    }
+                    stuckThreadNames.add(tracker.getTrackedThreadName());
+                }
+            }
+        }
+        if (stuckThreadNames == null) {
+            return List.of();
+        } else {
+            stuckThreadNames.sort(Comparator.naturalOrder());
+            return stuckThreadNames;
+        }
+    }
+
+    /**
+     * Per-thread class which keeps track of activity on that thread, represented as a {@code long} which is incremented every time an
+     * activity starts or stops. Thus the parity of its value indicates whether the thread is idle or not. Crucially, the activity tracking
+     * is very lightweight (on the tracked thread).
+     */
+    public static final class ActivityTracker extends AtomicLong {
+
+        private final Thread trackedThread;
+        private long lastObservedValue;
+
+        public ActivityTracker() {
+            this.trackedThread = Thread.currentThread();
+        }
+
+        String getTrackedThreadName() {
+            return trackedThread.getName();
+        }
+
+        public void startActivity() {
+            assert trackedThread == Thread.currentThread() : trackedThread.getName() + " vs " + Thread.currentThread().getName();
+            final var prevValue = getAndIncrement();
+            assert isIdle(prevValue) : "thread [" + trackedThread.getName() + "] was already active";
+        }
+
+        public void stopActivity() {
+            assert trackedThread == Thread.currentThread() : trackedThread.getName() + " vs " + Thread.currentThread().getName();
+            final var prevValue = getAndIncrement();
+            assert isIdle(prevValue) == false : "thread [" + trackedThread.getName() + "] was already idle";
+        }
+
+        boolean isIdleOrMakingProgress() {
+            final var value = get();
+            if (isIdle(value)) {
+                return true;
+            }
+            if (value == lastObservedValue) {
+                // no change since last check
+                return false;
+            } else {
+                // made progress since last check
+                lastObservedValue = value;
+                return true;
+            }
+        }
+
+        private static boolean isIdle(long value) {
+            // the parity of the value indicates the idle state: initially zero (idle), so active == odd
+            return (value & 1) == 0;
+        }
+    }
+
+    public void run(Settings settings, ThreadPool threadPool, Lifecycle lifecycle) {
+        new Checker(threadPool, NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings), NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings), lifecycle)
+            .run();
+    }
+
+    /**
+     * Action which runs itself periodically, calling {@link #getStuckThreadNames} to check for active threads that didn't make progress
+     * since the last call, and if it finds any then it dispatches {@link #threadDumper} to log the current hot threads.
+     */
+    private final class Checker extends AbstractRunnable {
+        private final ThreadPool threadPool;
+        private final TimeValue interval;
+        private final TimeValue quietTime;
+        private final Lifecycle lifecycle;
+
+        Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle) {
+            this.threadPool = threadPool;
+            this.interval = interval;
+            this.quietTime = quietTime.compareTo(interval) <= 0 ? interval : quietTime;
+            this.lifecycle = lifecycle;
+            assert this.interval.millis() <= this.quietTime.millis();
+        }
+
+        @Override
+        protected void doRun() {
+            if (isRunning() == false) {
+                return;
+            }
+
+            boolean rescheduleImmediately = true;
+            try {
+                final var stuckThreadNames = getStuckThreadNames();
+                if (stuckThreadNames.isEmpty() == false) {
+                    logger.warn(
+                        "the following threads are active but did not make progress in the preceding [{}]: {}",
+                        interval,
+                        stuckThreadNames
+                    );
+                    rescheduleImmediately = false;
+                    threadPool.generic().execute(threadDumper);
+                }
+            } finally {
+                if (rescheduleImmediately) {
+                    scheduleNext(interval);
+                }
+            }
+        }
+
+        @Override
+        public boolean isForceExecution() {
+            return true;
+        }
+
+        private boolean isRunning() {
+            return 0 < interval.millis() && lifecycle.stoppedOrClosed() == false;
+        }
+
+        private void scheduleNext(TimeValue delay) {
+            if (isRunning()) {
+                threadPool.scheduleUnlessShuttingDown(delay, EsExecutors.DIRECT_EXECUTOR_SERVICE, Checker.this);
+            }
+        }
+
+        private final AbstractRunnable threadDumper = new AbstractRunnable() {
+            @Override
+            protected void doRun() {
+                assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.GENERIC);
+                if (isRunning()) {
+                    HotThreads.logLocalHotThreads(
+                        logger,
+                        Level.WARN,
+                        "hot threads dump due to active threads not making progress",
+                        ReferenceDocs.NETWORK_THREADING_MODEL
+                    );
+                }
+            }
+
+            @Override
+            public boolean isForceExecution() {
+                return true;
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                Checker.this.onFailure(e);
+            }
+
+            @Override
+            public void onRejection(Exception e) {
+                Checker.this.onRejection(e);
+            }
+
+            @Override
+            public void onAfter() {
+                scheduleNext(quietTime);
+            }
+
+            @Override
+            public String toString() {
+                return "ThreadWatchDog$Checker#threadDumper";
+            }
+        };
+
+        @Override
+        public void onFailure(Exception e) {
+            logger.error("exception in ThreadWatchDog$Checker", e);
+            assert false : e;
+        }
+
+        @Override
+        public void onRejection(Exception e) {
+            logger.debug("ThreadWatchDog$Checker execution rejected", e);
+            assert e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e;
+        }
+
+        @Override
+        public String toString() {
+            return "ThreadWatchDog$Checker";
+        }
+    }
+}

+ 3 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -60,6 +60,7 @@ import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.network.NetworkService;
+import org.elasticsearch.common.network.ThreadWatchdog;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -421,6 +422,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
         NetworkService.TCP_REUSE_ADDRESS,
         NetworkService.TCP_SEND_BUFFER_SIZE,
         NetworkService.TCP_RECEIVE_BUFFER_SIZE,
+        ThreadWatchdog.NETWORK_THREAD_WATCHDOG_INTERVAL,
+        ThreadWatchdog.NETWORK_THREAD_WATCHDOG_QUIET_TIME,
         IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
         IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
         ScriptService.SCRIPT_CACHE_SIZE_SETTING,

+ 305 - 0
server/src/test/java/org/elasticsearch/common/network/ThreadWatchdogTests.java

@@ -0,0 +1,305 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.network;
+
+import org.apache.logging.log4j.Level;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.component.Lifecycle;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockLog;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.elasticsearch.core.TimeValue.timeValueMillis;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.not;
+
+public class ThreadWatchdogTests extends ESTestCase {
+
+    public void testSimpleActivityTracking() throws InterruptedException {
+        final var watchdog = new ThreadWatchdog();
+        final var barrier = new CyclicBarrier(2);
+        final var threadName = "watched-thread";
+        final var thread = new Thread(() -> {
+            final var activityTracker = watchdog.getActivityTrackerForCurrentThread();
+
+            assertEquals(0L, activityTracker.get());
+            if (randomBoolean()) {
+                // ensure overflow is no problem
+                activityTracker.set(Long.MAX_VALUE - randomFrom(1, 3, 5));
+            }
+
+            safeAwait(barrier);
+            // step 1: thread is idle
+            safeAwait(barrier);
+
+            activityTracker.startActivity();
+
+            safeAwait(barrier);
+            // step 2: thread is active
+            safeAwait(barrier);
+
+            for (int i = between(1, 10); i > 0; i--) {
+                activityTracker.stopActivity();
+                activityTracker.startActivity();
+            }
+
+            safeAwait(barrier);
+            // step 3: thread still active, but made progress
+            safeAwait(barrier);
+
+            activityTracker.stopActivity();
+
+            safeAwait(barrier);
+            // step 4: thread is idle again
+            safeAwait(barrier);
+
+        }, threadName);
+        thread.start();
+
+        safeAwait(barrier);
+
+        // step 1: thread is idle
+        assertEquals(List.of(), watchdog.getStuckThreadNames());
+        assertEquals(List.of(), watchdog.getStuckThreadNames());
+
+        safeAwait(barrier);
+        safeAwait(barrier);
+
+        // step 2: thread is active
+        assertEquals(List.of(), watchdog.getStuckThreadNames());
+        assertEquals(List.of(threadName), watchdog.getStuckThreadNames());
+        assertEquals(List.of(threadName), watchdog.getStuckThreadNames()); // just to check it's still reported as stuck
+
+        safeAwait(barrier);
+        safeAwait(barrier);
+
+        // step 3: thread still active, but made progress
+        assertEquals(List.of(), watchdog.getStuckThreadNames());
+        assertEquals(List.of(threadName), watchdog.getStuckThreadNames());
+        assertEquals(List.of(threadName), watchdog.getStuckThreadNames()); // just to check it's still reported as stuck
+
+        safeAwait(barrier);
+        safeAwait(barrier);
+
+        // step 4: thread is idle again
+        assertEquals(List.of(), watchdog.getStuckThreadNames());
+        assertEquals(List.of(), watchdog.getStuckThreadNames());
+
+        safeAwait(barrier);
+
+        thread.join();
+    }
+
+    public void testMultipleBlockedThreads() throws InterruptedException {
+        final var threadNames = randomList(2, 10, ESTestCase::randomIdentifier);
+
+        final var watchdog = new ThreadWatchdog();
+        final var barrier = new CyclicBarrier(threadNames.size() + 1);
+        final var threads = new Thread[threadNames.size()];
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new Thread(() -> {
+                safeAwait(barrier);
+                final var activityTracker = watchdog.getActivityTrackerForCurrentThread();
+                activityTracker.startActivity();
+                safeAwait(barrier);
+                // wait for main test thread
+                safeAwait(barrier);
+                activityTracker.stopActivity();
+            }, threadNames.get(i));
+            threads[i].start();
+        }
+
+        safeAwait(barrier);
+        safeAwait(barrier);
+
+        try {
+            assertEquals(List.of(), watchdog.getStuckThreadNames());
+            threadNames.sort(Comparator.naturalOrder()); // stuck threads are sorted by name
+            assertEquals(threadNames, watchdog.getStuckThreadNames());
+            assertEquals(threadNames, watchdog.getStuckThreadNames()); // just to check they're all still reported as stuck
+        } finally {
+            safeAwait(barrier);
+            for (final var thread : threads) {
+                thread.join();
+            }
+        }
+    }
+
+    public void testConcurrency() throws Exception {
+        final var keepGoing = new AtomicBoolean(true);
+        final var watchdog = new ThreadWatchdog();
+        final var threads = new Thread[between(1, 5)];
+        final var semaphoresByThreadName = new HashMap<String, Semaphore>();
+        final var warmUpLatches = new CountDownLatch[threads.length];
+        try {
+            for (int i = 0; i < threads.length; i++) {
+                final var threadName = "watched-thread-" + i;
+                final var semaphore = new Semaphore(1);
+                final var warmUpLatch = new CountDownLatch(20);
+                warmUpLatches[i] = warmUpLatch;
+                semaphoresByThreadName.put(threadName, semaphore);
+                threads[i] = new Thread(() -> {
+                    final var activityTracker = watchdog.getActivityTrackerForCurrentThread();
+                    while (keepGoing.get()) {
+                        activityTracker.startActivity();
+                        try {
+                            safeAcquire(semaphore);
+                            Thread.yield();
+                            semaphore.release();
+                            Thread.yield();
+                        } finally {
+                            activityTracker.stopActivity();
+                            warmUpLatch.countDown();
+                        }
+                    }
+                }, threadName);
+                threads[i].start();
+            }
+
+            for (final var warmUpLatch : warmUpLatches) {
+                safeAwait(warmUpLatch);
+            }
+
+            final var threadToBlock = randomFrom(semaphoresByThreadName.keySet());
+            final var semaphore = semaphoresByThreadName.get(threadToBlock);
+            safeAcquire(semaphore);
+            assertBusy(() -> assertThat(watchdog.getStuckThreadNames(), hasItem(threadToBlock)));
+            semaphore.release();
+            assertBusy(() -> assertThat(watchdog.getStuckThreadNames(), not(hasItem(threadToBlock))));
+        } finally {
+            keepGoing.set(false);
+            for (final var thread : threads) {
+                thread.join();
+            }
+        }
+    }
+
+    /**
+     * This logger is mentioned in the docs by name, so we cannot rename it without adjusting the docs. Thus we fix the expected logger
+     * name in this string constant rather than using {@code ThreadWatchdog.class.getCanonicalName()}.
+     */
+    private static final String LOGGER_NAME = "org.elasticsearch.common.network.ThreadWatchdog";
+
+    public void testLoggingAndScheduling() {
+        final var watchdog = new ThreadWatchdog();
+        final var activityTracker = watchdog.getActivityTrackerForCurrentThread();
+        final var deterministicTaskQueue = new DeterministicTaskQueue();
+
+        final var settings = Settings.builder();
+        final var lifecycle = new Lifecycle();
+        assertTrue(lifecycle.moveToStarted());
+
+        final long checkIntervalMillis;
+        if (randomBoolean()) {
+            checkIntervalMillis = ThreadWatchdog.NETWORK_THREAD_WATCHDOG_INTERVAL.get(Settings.EMPTY).millis();
+        } else {
+            checkIntervalMillis = between(1, 100000);
+            settings.put(ThreadWatchdog.NETWORK_THREAD_WATCHDOG_INTERVAL.getKey(), timeValueMillis(checkIntervalMillis));
+        }
+
+        final long quietTimeMillis;
+        if (randomBoolean()) {
+            quietTimeMillis = ThreadWatchdog.NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(Settings.EMPTY).millis();
+        } else {
+            quietTimeMillis = between(1, 100000);
+            settings.put(ThreadWatchdog.NETWORK_THREAD_WATCHDOG_QUIET_TIME.getKey(), timeValueMillis(quietTimeMillis));
+        }
+
+        watchdog.run(settings.build(), deterministicTaskQueue.getThreadPool(), lifecycle);
+
+        for (int i = 0; i < 3; i++) {
+            assertAdvanceTime(deterministicTaskQueue, checkIntervalMillis);
+            MockLog.assertThatLogger(
+                deterministicTaskQueue::runAllRunnableTasks,
+                ThreadWatchdog.class,
+                new MockLog.UnseenEventExpectation("no logging", LOGGER_NAME, Level.WARN, "*")
+            );
+        }
+
+        activityTracker.startActivity();
+        assertAdvanceTime(deterministicTaskQueue, checkIntervalMillis);
+        MockLog.assertThatLogger(
+            deterministicTaskQueue::runAllRunnableTasks,
+            ThreadWatchdog.class,
+            new MockLog.UnseenEventExpectation("no logging", LOGGER_NAME, Level.WARN, "*")
+        );
+        assertAdvanceTime(deterministicTaskQueue, checkIntervalMillis);
+        MockLog.assertThatLogger(
+            deterministicTaskQueue::runAllRunnableTasks,
+            ThreadWatchdog.class,
+            new MockLog.SeenEventExpectation(
+                "stuck threads logging",
+                LOGGER_NAME,
+                Level.WARN,
+                Strings.format(
+                    "the following threads are active but did not make progress in the preceding [%s]: [%s]",
+                    TimeValue.timeValueMillis(checkIntervalMillis),
+                    Thread.currentThread().getName()
+                )
+            ),
+            new MockLog.SeenEventExpectation(
+                "thread dump",
+                LOGGER_NAME,
+                Level.WARN,
+                "hot threads dump due to active threads not making progress (gzip compressed*base64-encoded*"
+            )
+        );
+        assertAdvanceTime(deterministicTaskQueue, Math.max(quietTimeMillis, checkIntervalMillis));
+        activityTracker.stopActivity();
+        MockLog.assertThatLogger(
+            deterministicTaskQueue::runAllRunnableTasks,
+            ThreadWatchdog.class,
+            new MockLog.UnseenEventExpectation("no logging", LOGGER_NAME, Level.WARN, "*")
+        );
+        assertAdvanceTime(deterministicTaskQueue, checkIntervalMillis);
+        deterministicTaskQueue.scheduleNow(lifecycle::moveToStopped);
+        deterministicTaskQueue.runAllTasksInTimeOrder(); // ensures that the rescheduling stops
+    }
+
+    public void testDisableWithZeroInterval() {
+        final var watchdog = new ThreadWatchdog();
+        final var deterministicTaskQueue = new DeterministicTaskQueue();
+        final var lifecycle = new Lifecycle();
+        assertTrue(lifecycle.moveToStarted());
+
+        watchdog.run(
+            Settings.builder()
+                .put(ThreadWatchdog.NETWORK_THREAD_WATCHDOG_INTERVAL.getKey(), randomFrom(TimeValue.ZERO, TimeValue.MINUS_ONE))
+                .build(),
+            deterministicTaskQueue.getThreadPool(),
+            lifecycle
+        );
+        assertFalse(deterministicTaskQueue.hasAnyTasks());
+
+        watchdog.run(
+            Settings.builder().put(ThreadWatchdog.NETWORK_THREAD_WATCHDOG_INTERVAL.getKey(), timeValueMillis(between(1, 100000))).build(),
+            deterministicTaskQueue.getThreadPool(),
+            lifecycle
+        );
+        assertTrue(deterministicTaskQueue.hasDeferredTasks());
+        lifecycle.moveToStopped();
+        deterministicTaskQueue.runAllTasksInTimeOrder(); // ensures that the rescheduling stops
+    }
+
+    private static void assertAdvanceTime(DeterministicTaskQueue deterministicTaskQueue, long expectedMillis) {
+        final var currentTimeMillis = deterministicTaskQueue.getCurrentTimeMillis();
+        deterministicTaskQueue.advanceTime();
+        assertEquals(expectedMillis, deterministicTaskQueue.getCurrentTimeMillis() - currentTimeMillis);
+    }
+}

+ 18 - 0
test/framework/src/main/java/org/elasticsearch/common/network/ThreadWatchdogHelper.java

@@ -0,0 +1,18 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.network;
+
+import java.util.List;
+
+public class ThreadWatchdogHelper {
+    // exposes this package-private method to tests
+    public static List<String> getStuckThreadNames(ThreadWatchdog watchdog) {
+        return watchdog.getStuckThreadNames();
+    }
+}

+ 59 - 0
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -18,6 +18,7 @@ import org.elasticsearch.TransportVersions;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
+import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.ChannelActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.UnsafePlainActionFuture;
@@ -33,6 +34,7 @@ import org.elasticsearch.common.network.CloseableChannel;
 import org.elasticsearch.common.network.NetworkAddress;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.network.NetworkUtils;
+import org.elasticsearch.common.network.ThreadWatchdog;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
@@ -41,6 +43,7 @@ import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.IOUtils;
@@ -226,6 +229,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
     ) {
         Settings updatedSettings = Settings.builder()
             .put(TransportSettings.PORT.getKey(), getPortRange())
+            .put(ThreadWatchdog.NETWORK_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO) // suppress watchdog running concurrently
             .put(settings)
             .put(Node.NODE_NAME_SETTING.getKey(), name)
             .put(IGNORE_DESERIALIZATION_ERRORS_SETTING.getKey(), true) // suppress assertions to test production error-handling
@@ -3348,6 +3352,61 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         }
     }
 
+    public void testWatchdogLogging() {
+        final var watchdog = networkService.getThreadWatchdog();
+        final var deterministicTaskQueue = new DeterministicTaskQueue();
+        watchdog.run(Settings.EMPTY, deterministicTaskQueue.getThreadPool(), new Lifecycle());
+
+        final var barrier = new CyclicBarrier(2);
+        final var threadNameFuture = new PlainActionFuture<String>();
+        final var actionName = "internal:action";
+        serviceA.registerRequestHandler(
+            actionName,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE,
+            TransportRequest.Empty::new,
+            (request, channel, task) -> {
+                threadNameFuture.onResponse(Thread.currentThread().getName());
+                safeAwait(barrier);
+                channel.sendResponse(TransportResponse.Empty.INSTANCE);
+            }
+        );
+
+        final var responseLatch = new CountDownLatch(1);
+        submitRequest(
+            serviceB,
+            nodeA,
+            actionName,
+            new TransportRequest.Empty(),
+            new ActionListenerResponseHandler<TransportResponse>(
+                ActionTestUtils.assertNoFailureListener(t -> responseLatch.countDown()),
+                in -> TransportResponse.Empty.INSTANCE,
+                EsExecutors.DIRECT_EXECUTOR_SERVICE
+            )
+        );
+
+        final var threadName = safeGet(threadNameFuture);
+        assertFalse(deterministicTaskQueue.hasRunnableTasks());
+        deterministicTaskQueue.advanceTime();
+        MockLog.assertThatLogger(
+            deterministicTaskQueue::runAllRunnableTasks,
+            ThreadWatchdog.class,
+            new MockLog.UnseenEventExpectation("no logging", ThreadWatchdog.class.getCanonicalName(), Level.WARN, "*")
+        );
+        deterministicTaskQueue.advanceTime();
+        MockLog.assertThatLogger(
+            deterministicTaskQueue::runAllRunnableTasks,
+            ThreadWatchdog.class,
+            new MockLog.SeenEventExpectation(
+                "stuck threads logging",
+                ThreadWatchdog.class.getCanonicalName(),
+                Level.WARN,
+                "the following threads are active but did not make progress in the preceding [5s]: [" + threadName + "]"
+            )
+        );
+        safeAwait(barrier);
+        safeAwait(responseLatch);
+    }
+
     private static long[] getConstantMessageSizeHistogram(int count, long size) {
         final var histogram = new long[29];
         int bucket = 0;