Browse Source

Avoid hot threads API in integ tests (#103431)

The hot threads API fans out to all the nodes in the cluster, capturing
all threads in each node's JVM. Since internal-cluster integ tests run
many nodes in the same JVM, this means that the hot threads API will
report the same information for each node. This is fairly pointless and
_very_ noisy in the test failure logs. This commit removes this
duplication by capturing and logging the hot threads once for the whole
test JVM.
David Turner 1 year ago
parent
commit
7c7bf69ee4

+ 4 - 1
modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java

@@ -11,6 +11,8 @@ package org.elasticsearch.transport.netty4;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.elasticsearch.ESNetty4IntegTestCase;
+import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
+import org.elasticsearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.MockLogAppender;
@@ -19,6 +21,7 @@ import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.TransportLogger;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 @ESIntegTestCase.ClusterScope(numDataNodes = 2, scope = ESIntegTestCase.Scope.TEST)
 public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
@@ -84,7 +87,7 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
         appender.addExpectation(writeExpectation);
         appender.addExpectation(flushExpectation);
         appender.addExpectation(readExpectation);
-        clusterAdmin().prepareNodesHotThreads().get();
+        client().execute(TransportNodesHotThreadsAction.TYPE, new NodesHotThreadsRequest()).actionGet(10, TimeUnit.SECONDS);
         appender.assertAllExpectationsMatched();
     }
 

+ 36 - 45
server/src/internalClusterTest/java/org/elasticsearch/action/admin/HotThreadsIT.java

@@ -11,8 +11,9 @@ import org.apache.logging.log4j.Level;
 import org.apache.lucene.util.Constants;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequestBuilder;
+import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
 import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
+import org.elasticsearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
 import org.elasticsearch.common.ReferenceDocs;
 import org.elasticsearch.common.logging.ChunkedLoggingStreamTests;
 import org.elasticsearch.core.TimeValue;
@@ -23,7 +24,7 @@ import org.hamcrest.Matcher;
 
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
@@ -41,38 +42,26 @@ import static org.hamcrest.collection.IsEmptyCollection.empty;
 
 public class HotThreadsIT extends ESIntegTestCase {
 
-    public void testHotThreadsDontFail() throws ExecutionException, InterruptedException {
-        /**
-         * This test just checks if nothing crashes or gets stuck etc.
-         */
+    public void testHotThreadsDontFail() throws InterruptedException {
+        // This test just checks if nothing crashes or gets stuck etc.
         createIndex("test");
         final int iters = scaledRandomIntBetween(2, 20);
         final AtomicBoolean hasErrors = new AtomicBoolean(false);
         for (int i = 0; i < iters; i++) {
-            final String type;
-            NodesHotThreadsRequestBuilder nodesHotThreadsRequestBuilder = clusterAdmin().prepareNodesHotThreads();
+            final NodesHotThreadsRequest request = new NodesHotThreadsRequest();
             if (randomBoolean()) {
                 TimeValue timeValue = new TimeValue(rarely() ? randomIntBetween(500, 5000) : randomIntBetween(20, 500));
-                nodesHotThreadsRequestBuilder.setInterval(timeValue);
+                request.interval(timeValue);
             }
             if (randomBoolean()) {
-                nodesHotThreadsRequestBuilder.setThreads(rarely() ? randomIntBetween(500, 5000) : randomIntBetween(1, 500));
+                request.threads(rarely() ? randomIntBetween(500, 5000) : randomIntBetween(1, 500));
             }
-            nodesHotThreadsRequestBuilder.setIgnoreIdleThreads(randomBoolean());
+            request.ignoreIdleThreads(randomBoolean());
             if (randomBoolean()) {
-                type = switch (randomIntBetween(0, 3)) {
-                    case 3 -> "mem";
-                    case 2 -> "cpu";
-                    case 1 -> "wait";
-                    default -> "block";
-                };
-                assertThat(type, notNullValue());
-                nodesHotThreadsRequestBuilder.setType(HotThreads.ReportType.of(type));
-            } else {
-                type = null;
+                request.type(HotThreads.ReportType.of(randomFrom("block", "mem", "cpu", "wait")));
             }
             final CountDownLatch latch = new CountDownLatch(1);
-            nodesHotThreadsRequestBuilder.execute(new ActionListener<NodesHotThreadsResponse>() {
+            client().execute(TransportNodesHotThreadsAction.TYPE, request, new ActionListener<>() {
                 @Override
                 public void onResponse(NodesHotThreadsResponse nodeHotThreads) {
                     boolean success = false;
@@ -83,7 +72,6 @@ public class HotThreadsIT extends ESIntegTestCase {
                         assertThat(nodesMap.size(), equalTo(cluster().size()));
                         for (NodeHotThreads ht : nodeHotThreads.getNodes()) {
                             assertNotNull(ht.getHotThreads());
-                            // logger.info(ht.getHotThreads());
                         }
                         success = true;
                     } finally {
@@ -120,40 +108,39 @@ public class HotThreadsIT extends ESIntegTestCase {
                     3L
                 );
             }
-            latch.await();
+            safeAwait(latch);
             assertThat(hasErrors.get(), is(false));
         }
     }
 
-    public void testIgnoreIdleThreads() throws ExecutionException, InterruptedException {
+    public void testIgnoreIdleThreads() {
         assumeTrue("no support for hot_threads on FreeBSD", Constants.FREE_BSD == false);
 
         // First time, don't ignore idle threads:
-        NodesHotThreadsRequestBuilder builder = clusterAdmin().prepareNodesHotThreads();
-        builder.setIgnoreIdleThreads(false);
-        builder.setThreads(Integer.MAX_VALUE);
-        NodesHotThreadsResponse response = builder.execute().get();
+        final NodesHotThreadsResponse firstResponse = client().execute(
+            TransportNodesHotThreadsAction.TYPE,
+            new NodesHotThreadsRequest().ignoreIdleThreads(false).threads(Integer.MAX_VALUE)
+        ).actionGet(10, TimeUnit.SECONDS);
 
         final Matcher<String> containsCachedTimeThreadRunMethod = containsString(
             "org.elasticsearch.threadpool.ThreadPool$CachedTimeThread.run"
         );
 
         int totSizeAll = 0;
-        for (NodeHotThreads node : response.getNodesMap().values()) {
+        for (NodeHotThreads node : firstResponse.getNodesMap().values()) {
             totSizeAll += node.getHotThreads().length();
             assertThat(node.getHotThreads(), containsCachedTimeThreadRunMethod);
         }
 
         // Second time, do ignore idle threads:
-        builder = clusterAdmin().prepareNodesHotThreads();
-        builder.setThreads(Integer.MAX_VALUE);
-
+        final var request = new NodesHotThreadsRequest().threads(Integer.MAX_VALUE);
         // Make sure default is true:
-        assertEquals(true, builder.request().ignoreIdleThreads());
-        response = builder.execute().get();
+        assertTrue(request.ignoreIdleThreads());
+        final NodesHotThreadsResponse secondResponse = client().execute(TransportNodesHotThreadsAction.TYPE, request)
+            .actionGet(10, TimeUnit.SECONDS);
 
         int totSizeIgnoreIdle = 0;
-        for (NodeHotThreads node : response.getNodesMap().values()) {
+        for (NodeHotThreads node : secondResponse.getNodesMap().values()) {
             totSizeIgnoreIdle += node.getHotThreads().length();
             assertThat(node.getHotThreads(), not(containsCachedTimeThreadRunMethod));
         }
@@ -162,22 +149,26 @@ public class HotThreadsIT extends ESIntegTestCase {
         assertThat(totSizeIgnoreIdle, lessThan(totSizeAll));
     }
 
-    public void testTimestampAndParams() throws ExecutionException, InterruptedException {
+    public void testTimestampAndParams() {
 
-        NodesHotThreadsResponse response = clusterAdmin().prepareNodesHotThreads().execute().get();
+        final NodesHotThreadsResponse response = client().execute(TransportNodesHotThreadsAction.TYPE, new NodesHotThreadsRequest())
+            .actionGet(10, TimeUnit.SECONDS);
 
         if (Constants.FREE_BSD) {
             for (NodeHotThreads node : response.getNodesMap().values()) {
-                String result = node.getHotThreads();
-                assertTrue(result.indexOf("hot_threads is not supported") != -1);
+                assertThat(node.getHotThreads(), containsString("hot_threads is not supported"));
             }
         } else {
             for (NodeHotThreads node : response.getNodesMap().values()) {
-                String result = node.getHotThreads();
-                assertTrue(result.indexOf("Hot threads at") != -1);
-                assertTrue(result.indexOf("interval=500ms") != -1);
-                assertTrue(result.indexOf("busiestThreads=3") != -1);
-                assertTrue(result.indexOf("ignoreIdleThreads=true") != -1);
+                assertThat(
+                    node.getHotThreads(),
+                    allOf(
+                        containsString("Hot threads at"),
+                        containsString("interval=500ms"),
+                        containsString("busiestThreads=3"),
+                        containsString("ignoreIdleThreads=true")
+                    )
+                );
             }
         }
     }

+ 10 - 15
server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java

@@ -8,22 +8,23 @@
 
 package org.elasticsearch.indices.recovery;
 
+import org.apache.logging.log4j.Level;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
 import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
 import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.ReferenceDocs;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.monitor.jvm.HotThreads;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
 public class IndexPrimaryRelocationIT extends ESIntegTestCase {
@@ -71,20 +72,14 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase {
                 .setWaitForNoRelocatingShards(true)
                 .get();
             if (clusterHealthResponse.isTimedOut()) {
-                final String hotThreads = clusterAdmin().prepareNodesHotThreads()
-                    .setIgnoreIdleThreads(false)
-                    .get()
-                    .getNodes()
-                    .stream()
-                    .map(NodeHotThreads::getHotThreads)
-                    .collect(Collectors.joining("\n"));
-                final ClusterState clusterState = clusterAdmin().prepareState().get().getState();
-                logger.info(
-                    "timed out for waiting for relocation iteration [{}] \ncluster state {} \nhot threads {}",
-                    i,
-                    clusterState,
-                    hotThreads
+                HotThreads.logLocalHotThreads(
+                    logger,
+                    Level.INFO,
+                    "timed out waiting for relocation iteration [" + i + "]",
+                    ReferenceDocs.LOGGING
                 );
+                final ClusterState clusterState = clusterAdmin().prepareState().get().getState();
+                logger.info("timed out for waiting for relocation iteration [{}] \ncluster state {}", i, clusterState);
                 finished.set(true);
                 indexingThread.join();
                 throw new AssertionError("timed out waiting for relocation iteration [" + i + "] ");

+ 8 - 11
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStressTestsIT.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.snapshots;
 
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.tests.util.LuceneTestCase;
@@ -16,7 +17,6 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ShardOperationFailedException;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder;
 import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequestBuilder;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
@@ -34,6 +34,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Randomness;
+import org.elasticsearch.common.ReferenceDocs;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -46,6 +47,7 @@ import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.monitor.jvm.HotThreads;
 import org.elasticsearch.repositories.RepositoryCleanupResult;
 import org.elasticsearch.repositories.fs.FsRepository;
 import org.elasticsearch.test.InternalTestCluster;
@@ -371,16 +373,11 @@ public class SnapshotStressTestsIT extends AbstractSnapshotIntegTestCase {
                             "--> current cluster state:\n{}",
                             Strings.toString(clusterAdmin().prepareState().get().getState(), true, true)
                         );
-                        logger.info(
-                            "--> hot threads:\n{}",
-                            clusterAdmin().prepareNodesHotThreads()
-                                .setThreads(99999)
-                                .setIgnoreIdleThreads(false)
-                                .get()
-                                .getNodes()
-                                .stream()
-                                .map(NodeHotThreads::getHotThreads)
-                                .collect(Collectors.joining("\n"))
+                        HotThreads.logLocalHotThreads(
+                            logger,
+                            Level.INFO,
+                            "hot threads while failing to acquire permit [" + label + "]",
+                            ReferenceDocs.LOGGING
                         );
                         failedPermitAcquisitions.add(label);
                     }

+ 0 - 49
server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequestBuilder.java

@@ -1,49 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-package org.elasticsearch.action.admin.cluster.node.hotthreads;
-
-import org.elasticsearch.action.support.nodes.NodesOperationRequestBuilder;
-import org.elasticsearch.client.internal.ElasticsearchClient;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.monitor.jvm.HotThreads;
-
-public class NodesHotThreadsRequestBuilder extends NodesOperationRequestBuilder<
-    NodesHotThreadsRequest,
-    NodesHotThreadsResponse,
-    NodesHotThreadsRequestBuilder> {
-
-    public NodesHotThreadsRequestBuilder(ElasticsearchClient client) {
-        super(client, TransportNodesHotThreadsAction.TYPE, new NodesHotThreadsRequest());
-    }
-
-    public NodesHotThreadsRequestBuilder setThreads(int threads) {
-        request.threads(threads);
-        return this;
-    }
-
-    public NodesHotThreadsRequestBuilder setIgnoreIdleThreads(boolean ignoreIdleThreads) {
-        request.ignoreIdleThreads(ignoreIdleThreads);
-        return this;
-    }
-
-    public NodesHotThreadsRequestBuilder setType(HotThreads.ReportType type) {
-        request.type(type);
-        return this;
-    }
-
-    public NodesHotThreadsRequestBuilder setSortOrder(HotThreads.SortOrder sortOrder) {
-        request.sortOrder(sortOrder);
-        return this;
-    }
-
-    public NodesHotThreadsRequestBuilder setInterval(TimeValue interval) {
-        request.interval(interval);
-        return this;
-    }
-}

+ 0 - 15
server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java

@@ -16,9 +16,6 @@ import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplai
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequestBuilder;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequestBuilder;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
@@ -256,18 +253,6 @@ public interface ClusterAdminClient extends ElasticsearchClient {
      */
     void nodesUsage(NodesUsageRequest request, ActionListener<NodesUsageResponse> listener);
 
-    /**
-     * Returns top N hot-threads samples per node. The hot-threads are only sampled
-     * for the node ids specified in the request.
-     */
-    void nodesHotThreads(NodesHotThreadsRequest request, ActionListener<NodesHotThreadsResponse> listener);
-
-    /**
-     * Returns a request builder to fetch top N hot-threads samples per node. The hot-threads are only sampled
-     * for the node ids provided. Note: Use {@code *} to fetch samples for all nodes
-     */
-    NodesHotThreadsRequestBuilder prepareNodesHotThreads(String... nodesIds);
-
     /**
      * List tasks
      *

+ 0 - 14
server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java

@@ -24,10 +24,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequestBuilder;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequestBuilder;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
@@ -795,16 +791,6 @@ public abstract class AbstractClient implements Client {
             return new ClusterStatsRequestBuilder(this);
         }
 
-        @Override
-        public void nodesHotThreads(NodesHotThreadsRequest request, ActionListener<NodesHotThreadsResponse> listener) {
-            execute(TransportNodesHotThreadsAction.TYPE, request, listener);
-        }
-
-        @Override
-        public NodesHotThreadsRequestBuilder prepareNodesHotThreads(String... nodesIds) {
-            return new NodesHotThreadsRequestBuilder(this).setNodesIds(nodesIds);
-        }
-
         @Override
         public ActionFuture<ListTasksResponse> listTasks(final ListTasksRequest request) {
             return execute(TransportListTasksAction.TYPE, request);

+ 2 - 1
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java

@@ -10,6 +10,7 @@ package org.elasticsearch.rest.action.admin.cluster;
 
 import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
 import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
+import org.elasticsearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.core.RestApiVersion;
@@ -112,7 +113,7 @@ public class RestNodesHotThreadsAction extends BaseRestHandler {
         nodesHotThreadsRequest.interval(TimeValue.parseTimeValue(request.param("interval"), nodesHotThreadsRequest.interval(), "interval"));
         nodesHotThreadsRequest.snapshots(request.paramAsInt("snapshots", nodesHotThreadsRequest.snapshots()));
         nodesHotThreadsRequest.timeout(request.param("timeout"));
-        return channel -> client.admin().cluster().nodesHotThreads(nodesHotThreadsRequest, new RestResponseListener<>(channel) {
+        return channel -> client.execute(TransportNodesHotThreadsAction.TYPE, nodesHotThreadsRequest, new RestResponseListener<>(channel) {
             @Override
             public RestResponse buildResponse(NodesHotThreadsResponse response) {
                 return RestResponse.chunked(RestStatus.OK, fromTextChunks(TEXT_CONTENT_TYPE, response.getTextChunks(), null));

+ 12 - 11
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -26,8 +26,6 @@ import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@@ -119,6 +117,7 @@ import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.indices.IndicesQueryCache;
 import org.elasticsearch.indices.IndicesRequestCache;
 import org.elasticsearch.indices.store.IndicesStore;
+import org.elasticsearch.monitor.jvm.HotThreads;
 import org.elasticsearch.node.NodeMocksPlugin;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.plugins.NetworkPlugin;
@@ -153,6 +152,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 
 import java.io.IOException;
+import java.io.StringWriter;
 import java.lang.annotation.Annotation;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Inherited;
@@ -971,17 +971,21 @@ public abstract class ESIntegTestCase extends ESTestCase {
             final var allocationExplainRef = new AtomicReference<ClusterAllocationExplainResponse>();
             final var clusterStateRef = new AtomicReference<ClusterStateResponse>();
             final var pendingTasksRef = new AtomicReference<PendingClusterTasksResponse>();
-            final var hotThreadsRef = new AtomicReference<NodesHotThreadsResponse>();
+            final var hotThreadsRef = new AtomicReference<String>();
 
             final var detailsFuture = new PlainActionFuture<Void>();
             try (var listeners = new RefCountingListener(detailsFuture)) {
                 clusterAdmin().prepareAllocationExplain().execute(listeners.acquire(allocationExplainRef::set));
                 clusterAdmin().prepareState().execute(listeners.acquire(clusterStateRef::set));
                 clusterAdmin().preparePendingClusterTasks().execute(listeners.acquire(pendingTasksRef::set));
-                clusterAdmin().prepareNodesHotThreads()
-                    .setThreads(9999)
-                    .setIgnoreIdleThreads(false)
-                    .execute(listeners.acquire(hotThreadsRef::set));
+
+                try (var writer = new StringWriter()) {
+                    new HotThreads().busiestThreads(9999).ignoreIdleThreads(false).detect(writer);
+                    hotThreadsRef.set(writer.toString());
+                } catch (Exception e) {
+                    logger.error("exception capturing hot threads", e);
+                    hotThreadsRef.set("exception capturing hot threads: " + e);
+                }
             }
 
             try {
@@ -996,10 +1000,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
                 safeFormat(allocationExplainRef.get(), r -> Strings.toString(r.getExplanation(), true, true)),
                 safeFormat(clusterStateRef.get(), r -> r.getState().toString()),
                 safeFormat(pendingTasksRef.get(), r -> Strings.toString(r, true, true)),
-                safeFormat(
-                    hotThreadsRef.get(),
-                    r -> r.getNodes().stream().map(NodeHotThreads::getHotThreads).collect(Collectors.joining("\n"))
-                )
+                hotThreadsRef.get()
             );
             fail("timed out waiting for " + color + " state");
         }

+ 4 - 20
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

@@ -7,10 +7,10 @@
 
 package org.elasticsearch.xpack;
 
+import org.apache.logging.log4j.Level;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
@@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Randomness;
+import org.elasticsearch.common.ReferenceDocs;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.network.NetworkModule;
@@ -59,6 +60,7 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.store.IndicesStore;
 import org.elasticsearch.license.LicenseSettings;
 import org.elasticsearch.license.LicensesMetadata;
+import org.elasticsearch.monitor.jvm.HotThreads;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.search.SearchResponseUtils;
@@ -421,24 +423,19 @@ public abstract class CcrIntegTestCase extends ESTestCase {
                     {} timed out:
                     leader cluster state:
                     {}
-                    leader cluster hot threads:
-                    {}
                     leader cluster tasks:
                     {}
                     follower cluster state:
                     {}
-                    follower cluster hot threads:
-                    {}
                     follower cluster tasks:
                     {}""",
                 method,
                 leaderClient().admin().cluster().prepareState().get().getState(),
-                getHotThreads(leaderClient()),
                 leaderClient().admin().cluster().preparePendingClusterTasks().get(),
                 followerClient().admin().cluster().prepareState().get().getState(),
-                getHotThreads(followerClient()),
                 followerClient().admin().cluster().preparePendingClusterTasks().get()
             );
+            HotThreads.logLocalHotThreads(logger, Level.INFO, "hot threads at timeout", ReferenceDocs.LOGGING);
             fail("timed out waiting for " + color + " state");
         }
         assertThat(
@@ -450,19 +447,6 @@ public abstract class CcrIntegTestCase extends ESTestCase {
         return actionGet.getStatus();
     }
 
-    static String getHotThreads(Client client) {
-        return client.admin()
-            .cluster()
-            .prepareNodesHotThreads()
-            .setThreads(99999)
-            .setIgnoreIdleThreads(false)
-            .get()
-            .getNodes()
-            .stream()
-            .map(NodeHotThreads::getHotThreads)
-            .collect(Collectors.joining("\n"));
-    }
-
     protected final Index resolveLeaderIndex(String index) {
         GetIndexResponse getIndexResponse = leaderClient().admin().indices().prepareGetIndex().setIndices(index).get();
         assertTrue("index " + index + " not found", getIndexResponse.getSettings().containsKey(index));

+ 6 - 17
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java

@@ -6,19 +6,20 @@
  */
 package org.elasticsearch.xpack.ml.integration;
 
+import org.apache.logging.log4j.Level;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceNotFoundException;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
-import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.ReferenceDocs;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.core.CheckedRunnable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.monitor.jvm.HotThreads;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
@@ -468,11 +469,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
             StopDatafeedAction.Response stopJobResponse = stopDatafeed(datafeedId);
             assertTrue(stopJobResponse.isStopped());
         } catch (Exception e) {
-            NodesHotThreadsResponse nodesHotThreadsResponse = clusterAdmin().prepareNodesHotThreads().get();
-            int i = 0;
-            for (NodeHotThreads nodeHotThreads : nodesHotThreadsResponse.getNodes()) {
-                logger.info(i++ + ":\n" + nodeHotThreads.getHotThreads());
-            }
+            HotThreads.logLocalHotThreads(logger, Level.INFO, "hot threads at failure", ReferenceDocs.LOGGING);
             throw e;
         }
         assertBusy(() -> {
@@ -491,11 +488,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
             CloseJobAction.Response closeJobResponse = closeJob(jobId);
             assertTrue(closeJobResponse.isClosed());
         } catch (Exception e) {
-            NodesHotThreadsResponse nodesHotThreadsResponse = clusterAdmin().prepareNodesHotThreads().get();
-            int i = 0;
-            for (NodeHotThreads nodeHotThreads : nodesHotThreadsResponse.getNodes()) {
-                logger.info(i++ + ":\n" + nodeHotThreads.getHotThreads());
-            }
+            HotThreads.logLocalHotThreads(logger, Level.INFO, "hot threads at failure", ReferenceDocs.LOGGING);
             throw e;
         }
         assertBusy(() -> {
@@ -538,11 +531,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
             CloseJobAction.Response closeJobResponse = closeJob(jobId, useForce);
             assertTrue(closeJobResponse.isClosed());
         } catch (Exception e) {
-            NodesHotThreadsResponse nodesHotThreadsResponse = clusterAdmin().prepareNodesHotThreads().get();
-            int i = 0;
-            for (NodeHotThreads nodeHotThreads : nodesHotThreadsResponse.getNodes()) {
-                logger.info(i++ + ":\n" + nodeHotThreads.getHotThreads());
-            }
+            HotThreads.logLocalHotThreads(logger, Level.INFO, "hot threads at failure", ReferenceDocs.LOGGING);
             throw e;
         }
         GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);