Browse Source

Add `thread_pool` information to the cluster info endpoint (#96407)

Add a new target (thread_pool) to the /_info API. It consolidates all the thread pools information from the cluster nodes and returns a summary at the cluster level (compared with _nodes/stats/thread_pool it lacks the <node> dimension)
Pablo Alcantar Morales 2 years ago
parent
commit
f5d47ccb0d

+ 6 - 0
docs/changelog/96407.yaml

@@ -0,0 +1,6 @@
+pr: 96407
+summary: Add `thread_pool` information to the cluster info endpoint
+area: Stats
+type: enhancement
+issues:
+ - 95393

+ 46 - 0
docs/reference/cluster/cluster-info.asciidoc

@@ -41,6 +41,9 @@ HTTP connection information.
 
 `ingest`::
 Ingest information.
+
+`thread_pool`::
+Statistics about each thread pool, including current size, queue size and rejected tasks.
 --
 
 [role="child_attributes"]
@@ -238,6 +241,46 @@ Number of failed operations for the processor.
 ======
 
 
+[[cluster-info-api-response-body-threadpool]]
+`thread_pool`::
+(object)
+Contains information about the thread pools of the cluster.
++
+.Properties of `thread_pool`
+[%collapsible%open]
+======
+`<thread_pool_name>`::
+(object)
+Contains information about the thread pool of the cluster with name `<thread_pool_name>`.
++
+.Properties of `<thread_pool_name>`
+[%collapsible%open]
+=======
+`threads`::
+(integer)
+Number of threads in the thread pool.
+
+`queue`::
+(integer)
+Number of tasks in queue for the thread pool.
+
+`active`::
+(integer)
+Number of active threads in the thread pool.
+
+`rejected`::
+(integer)
+Number of tasks rejected by the thread pool executor.
+
+`largest`::
+(integer)
+Highest number of active threads in the thread pool.
+
+`completed`::
+(integer)
+Number of tasks completed by the thread pool executor.
+=======
+======
 
 [[cluster-info-api-example]]
 ==== {api-examples-title}
@@ -253,6 +296,9 @@ GET /_info/http
 # returns the http info of the cluster
 GET /_info/ingest
 
+# returns the thread_pool info of the cluster
+GET /_info/thread_pool
+
 # returns the http and ingest info of the cluster
 GET /_info/http,ingest
 ----

+ 3 - 1
rest-api-spec/src/main/resources/rest-api-spec/api/cluster.info.json

@@ -21,7 +21,9 @@
                           "type":"list",
                           "options":[
                               "_all",
-                              "http"
+                              "http",
+                              "ingest",
+                              "thread_pool"
                           ],
                           "description":"Limit the information returned to the specified target."
                       }

+ 1 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.info/10_info_all.yml

@@ -14,6 +14,7 @@ setup:
   - is_true: cluster_name
   - is_true: http
   - is_true: ingest
+  - is_true: thread_pool
 
 ---
 "Cluster Info fails when mixing _all with other targets":

+ 159 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.info/30_info_thread_pool.yml

@@ -0,0 +1,159 @@
+---
+"Cluster HTTP Info":
+  - skip:
+      version: " - 8.8.99"
+      reason: "/_info/thread_pool only available from v8.9"
+
+  - do:
+      cluster.info:
+        target: [ thread_pool ]
+
+  - is_true: cluster_name
+  - is_true: thread_pool
+
+  - gte: { thread_pool.analyze.threads: 0 }
+  - gte: { thread_pool.analyze.queue: 0 }
+  - gte: { thread_pool.analyze.active: 0 }
+  - gte: { thread_pool.analyze.rejected: 0 }
+  - gte: { thread_pool.analyze.largest: 0 }
+  - gte: { thread_pool.analyze.completed: 0 }
+
+  - gte: { thread_pool.auto_complete.threads: 0 }
+  - gte: { thread_pool.auto_complete.queue: 0 }
+  - gte: { thread_pool.auto_complete.active: 0 }
+  - gte: { thread_pool.auto_complete.rejected: 0 }
+  - gte: { thread_pool.auto_complete.largest: 0 }
+  - gte: { thread_pool.auto_complete.completed: 0 }
+
+  - gte: { thread_pool.cluster_coordination.threads: 0 }
+  - gte: { thread_pool.cluster_coordination.queue: 0 }
+  - gte: { thread_pool.cluster_coordination.active: 0 }
+  - gte: { thread_pool.cluster_coordination.rejected: 0 }
+  - gte: { thread_pool.cluster_coordination.largest: 0 }
+  - gte: { thread_pool.cluster_coordination.completed: 0 }
+
+  - gte: { thread_pool.fetch_shard_store.threads: 0 }
+  - gte: { thread_pool.fetch_shard_store.queue: 0 }
+  - gte: { thread_pool.fetch_shard_store.active: 0 }
+  - gte: { thread_pool.fetch_shard_store.rejected: 0 }
+  - gte: { thread_pool.fetch_shard_store.largest: 0 }
+  - gte: { thread_pool.fetch_shard_store.completed: 0 }
+
+  - gte: { thread_pool.flush.threads: 0 }
+  - gte: { thread_pool.flush.queue: 0 }
+  - gte: { thread_pool.flush.active: 0 }
+  - gte: { thread_pool.flush.rejected: 0 }
+  - gte: { thread_pool.flush.largest: 0 }
+  - gte: { thread_pool.flush.completed: 0 }
+
+  - gte: { thread_pool.force_merge.threads: 0 }
+  - gte: { thread_pool.force_merge.queue: 0 }
+  - gte: { thread_pool.force_merge.active: 0 }
+  - gte: { thread_pool.force_merge.rejected: 0 }
+  - gte: { thread_pool.force_merge.largest: 0 }
+  - gte: { thread_pool.force_merge.completed: 0 }
+
+  - gte: { thread_pool.generic.threads: 0 }
+  - gte: { thread_pool.generic.queue: 0 }
+  - gte: { thread_pool.generic.active: 0 }
+  - gte: { thread_pool.generic.rejected: 0 }
+  - gte: { thread_pool.generic.largest: 0 }
+  - gte: { thread_pool.generic.completed: 0 }
+
+  - gte: { thread_pool.get.threads: 0 }
+  - gte: { thread_pool.get.queue: 0 }
+  - gte: { thread_pool.get.active: 0 }
+  - gte: { thread_pool.get.rejected: 0 }
+  - gte: { thread_pool.get.largest: 0 }
+  - gte: { thread_pool.get.completed: 0 }
+
+  - gte: { thread_pool.management.threads: 0 }
+  - gte: { thread_pool.management.queue: 0 }
+  - gte: { thread_pool.management.active: 0 }
+  - gte: { thread_pool.management.rejected: 0 }
+  - gte: { thread_pool.management.largest: 0 }
+  - gte: { thread_pool.management.completed: 0 }
+
+  - gte: { thread_pool.refresh.threads: 0 }
+  - gte: { thread_pool.refresh.queue: 0 }
+  - gte: { thread_pool.refresh.active: 0 }
+  - gte: { thread_pool.refresh.rejected: 0 }
+  - gte: { thread_pool.refresh.largest: 0 }
+  - gte: { thread_pool.refresh.completed: 0 }
+
+  - gte: { thread_pool.search.threads: 0 }
+  - gte: { thread_pool.search.queue: 0 }
+  - gte: { thread_pool.search.active: 0 }
+  - gte: { thread_pool.search.rejected: 0 }
+  - gte: { thread_pool.search.largest: 0 }
+  - gte: { thread_pool.search.completed: 0 }
+
+  - gte: { thread_pool.search_coordination.threads: 0 }
+  - gte: { thread_pool.search_coordination.queue: 0 }
+  - gte: { thread_pool.search_coordination.active: 0 }
+  - gte: { thread_pool.search_coordination.rejected: 0 }
+  - gte: { thread_pool.search_coordination.largest: 0 }
+  - gte: { thread_pool.search_coordination.completed: 0 }
+
+  - gte: { thread_pool.search_throttled.threads: 0 }
+  - gte: { thread_pool.search_throttled.queue: 0 }
+  - gte: { thread_pool.search_throttled.active: 0 }
+  - gte: { thread_pool.search_throttled.rejected: 0 }
+  - gte: { thread_pool.search_throttled.largest: 0 }
+  - gte: { thread_pool.search_throttled.completed: 0 }
+
+  - gte: { thread_pool.snapshot.threads: 0 }
+  - gte: { thread_pool.snapshot.queue: 0 }
+  - gte: { thread_pool.snapshot.active: 0 }
+  - gte: { thread_pool.snapshot.rejected: 0 }
+  - gte: { thread_pool.snapshot.largest: 0 }
+  - gte: { thread_pool.snapshot.completed: 0 }
+
+  - gte: { thread_pool.snapshot_meta.threads: 0 }
+  - gte: { thread_pool.snapshot_meta.queue: 0 }
+  - gte: { thread_pool.snapshot_meta.active: 0 }
+  - gte: { thread_pool.snapshot_meta.rejected: 0 }
+  - gte: { thread_pool.snapshot_meta.largest: 0 }
+  - gte: { thread_pool.snapshot_meta.completed: 0 }
+
+  - gte: { thread_pool.system_critical_read.threads: 0 }
+  - gte: { thread_pool.system_critical_read.queue: 0 }
+  - gte: { thread_pool.system_critical_read.active: 0 }
+  - gte: { thread_pool.system_critical_read.rejected: 0 }
+  - gte: { thread_pool.system_critical_read.largest: 0 }
+  - gte: { thread_pool.system_critical_read.completed: 0 }
+
+  - gte: { thread_pool.system_critical_write.threads: 0 }
+  - gte: { thread_pool.system_critical_write.queue: 0 }
+  - gte: { thread_pool.system_critical_write.active: 0 }
+  - gte: { thread_pool.system_critical_write.rejected: 0 }
+  - gte: { thread_pool.system_critical_write.largest: 0 }
+  - gte: { thread_pool.system_critical_write.completed: 0 }
+
+  - gte: { thread_pool.system_read.threads: 0 }
+  - gte: { thread_pool.system_read.queue: 0 }
+  - gte: { thread_pool.system_read.active: 0 }
+  - gte: { thread_pool.system_read.rejected: 0 }
+  - gte: { thread_pool.system_read.largest: 0 }
+  - gte: { thread_pool.system_read.completed: 0 }
+
+  - gte: { thread_pool.system_write.threads: 0 }
+  - gte: { thread_pool.system_write.queue: 0 }
+  - gte: { thread_pool.system_write.active: 0 }
+  - gte: { thread_pool.system_write.rejected: 0 }
+  - gte: { thread_pool.system_write.largest: 0 }
+  - gte: { thread_pool.system_write.completed: 0 }
+
+  - gte: { thread_pool.warmer.threads: 0 }
+  - gte: { thread_pool.warmer.queue: 0 }
+  - gte: { thread_pool.warmer.active: 0 }
+  - gte: { thread_pool.warmer.rejected: 0 }
+  - gte: { thread_pool.warmer.largest: 0 }
+  - gte: { thread_pool.warmer.completed: 0 }
+
+  - gte: { thread_pool.write.threads: 0 }
+  - gte: { thread_pool.write.queue: 0 }
+  - gte: { thread_pool.write.active: 0 }
+  - gte: { thread_pool.write.rejected: 0 }
+  - gte: { thread_pool.write.largest: 0 }
+  - gte: { thread_pool.write.completed: 0 }

+ 12 - 1
server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java

@@ -23,8 +23,11 @@ import org.elasticsearch.rest.ChunkedRestResponseBody;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestResponseListener;
+import org.elasticsearch.threadpool.ThreadPoolStats;
 
 import java.io.IOException;
 import java.util.List;
@@ -38,8 +41,10 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.HTTP;
 import static org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.INGEST;
+import static org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.THREAD_POOL;
 import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
 
+@ServerlessScope(Scope.PUBLIC)
 public class RestClusterInfoAction extends BaseRestHandler {
 
     static final Map<String, Function<NodesStatsResponse, ChunkedToXContent>> RESPONSE_MAPPER = Map.of(
@@ -50,7 +55,13 @@ public class RestClusterInfoAction extends BaseRestHandler {
         nodesStatsResponse -> nodesStatsResponse.getNodes()
             .stream()
             .map(NodeStats::getIngestStats)
-            .reduce(IngestStats.IDENTITY, IngestStats::merge)
+            .reduce(IngestStats.IDENTITY, IngestStats::merge),
+        //
+        THREAD_POOL.metricName(),
+        nodesStatsResponse -> nodesStatsResponse.getNodes()
+            .stream()
+            .map(NodeStats::getThreadPool)
+            .reduce(ThreadPoolStats.IDENTITY, ThreadPoolStats::merge)
     );
     static final Set<String> AVAILABLE_TARGETS = RESPONSE_MAPPER.keySet();
 

+ 55 - 4
server/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java

@@ -17,14 +17,28 @@ import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
 import org.elasticsearch.xcontent.ToXContent;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
 import static java.util.Collections.emptyIterator;
 import static org.elasticsearch.common.collect.Iterators.single;
 
-public record ThreadPoolStats(List<Stats> stats) implements Writeable, ChunkedToXContent, Iterable<ThreadPoolStats.Stats> {
+public record ThreadPoolStats(Collection<Stats> stats) implements Writeable, ChunkedToXContent, Iterable<ThreadPoolStats.Stats> {
+
+    public static final ThreadPoolStats IDENTITY = new ThreadPoolStats(List.of());
+
+    public static ThreadPoolStats merge(ThreadPoolStats first, ThreadPoolStats second) {
+        var mergedThreadPools = new HashMap<String, Stats>();
+
+        first.forEach(stats -> mergedThreadPools.merge(stats.name, stats, Stats::merge));
+        second.forEach(stats -> mergedThreadPools.merge(stats.name, stats, Stats::merge));
+
+        return new ThreadPoolStats(mergedThreadPools.values());
+    }
 
     public record Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed)
         implements
@@ -36,6 +50,42 @@ public record ThreadPoolStats(List<Stats> stats) implements Writeable, ChunkedTo
             this(in.readString(), in.readInt(), in.readInt(), in.readInt(), in.readLong(), in.readInt(), in.readLong());
         }
 
+        static Stats merge(Stats firstStats, Stats secondStats) {
+            return new Stats(
+                firstStats.name,
+                sumStat(firstStats.threads, secondStats.threads),
+                sumStat(firstStats.queue, secondStats.queue),
+                sumStat(firstStats.active, secondStats.active),
+                sumStat(firstStats.rejected, secondStats.rejected),
+                sumStat(firstStats.largest, secondStats.largest),
+                sumStat(firstStats.completed, secondStats.completed)
+            );
+        }
+
+        static int sumStat(int first, int second) {
+            if (first == -1 && second == -1) {
+                return -1;
+            } else if (first == -1) {
+                return second;
+            } else if (second == -1) {
+                return first;
+            } else {
+                return first + second;
+            }
+        }
+
+        static long sumStat(long first, long second) {
+            if (first == -1 && second == -1) {
+                return -1;
+            } else if (first == -1) {
+                return second;
+            } else if (second == -1) {
+                return first;
+            } else {
+                return first + second;
+            }
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeString(name);
@@ -80,8 +130,9 @@ public record ThreadPoolStats(List<Stats> stats) implements Writeable, ChunkedTo
     }
 
     public ThreadPoolStats {
-        Collections.sort(stats);
-        stats = Collections.unmodifiableList(stats);
+        var statsCopy = new ArrayList<>(stats);
+        Collections.sort(statsCopy);
+        stats = Collections.unmodifiableList(statsCopy);
     }
 
     public ThreadPoolStats(StreamInput in) throws IOException {
@@ -90,7 +141,7 @@ public record ThreadPoolStats(List<Stats> stats) implements Writeable, ChunkedTo
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        out.writeList(stats);
+        out.writeCollection(stats);
     }
 
     @Override

+ 4 - 7
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -72,7 +72,6 @@ import org.elasticsearch.search.suggest.completion.CompletionStats;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
 import org.elasticsearch.threadpool.ThreadPoolStats;
-import org.elasticsearch.threadpool.ThreadPoolStatsTests;
 import org.elasticsearch.transport.TransportActionStats;
 import org.elasticsearch.transport.TransportStats;
 import org.elasticsearch.xcontent.ToXContent;
@@ -90,6 +89,7 @@ import java.util.stream.IntStream;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
 import static org.elasticsearch.test.AbstractChunkedSerializingTestCase.assertChunkCount;
+import static org.elasticsearch.threadpool.ThreadPoolStatsTests.randomStats;
 
 public class NodeStatsTests extends ESTestCase {
     public void testSerialization() throws IOException {
@@ -795,12 +795,9 @@ public class NodeStatsTests extends ESTestCase {
         }
         ThreadPoolStats threadPoolStats = null;
         if (frequently()) {
-            var numThreadPoolStats = randomIntBetween(0, 10);
-            var threadPoolStatsList = new ArrayList<ThreadPoolStats.Stats>();
-            for (int i = 0; i < numThreadPoolStats; i++) {
-                threadPoolStatsList.add(ThreadPoolStatsTests.randomStats(randomAlphaOfLengthBetween(3, 10)));
-            }
-            threadPoolStats = new ThreadPoolStats(threadPoolStatsList);
+            threadPoolStats = new ThreadPoolStats(
+                IntStream.range(0, randomIntBetween(0, 10)).mapToObj(i -> randomStats(randomAlphaOfLengthBetween(3, 10))).toList()
+            );
         }
         FsInfo fsInfo = null;
         if (frequently()) {

+ 46 - 14
server/src/test/java/org/elasticsearch/threadpool/ThreadPoolStatsTests.java

@@ -18,27 +18,59 @@ import java.util.List;
 
 import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_TYPES;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
 
 public class ThreadPoolStatsTests extends ESTestCase {
-    public void testThreadPoolStatsSort() {
-        var stats = List.of(
-            new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L),
-            new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L),
-            new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L),
-            new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L),
-            new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L),
-            new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L),
+    public void testThreadPoolStatsConstructorSortTheStats() {
+        var unorderedStats = List.of(
+            new ThreadPoolStats.Stats("z", 7, 0, 0, 0, 0, 0L),
+            new ThreadPoolStats.Stats("m", 5, 0, 0, 0, 0, 0L),
+            new ThreadPoolStats.Stats("m", -3, 0, 0, 0, 0, 0L),
+            new ThreadPoolStats.Stats("d", 2, 0, 0, 0, 0, 0L),
+            new ThreadPoolStats.Stats("m", 4, 0, 0, 0, 0, 0L),
+            new ThreadPoolStats.Stats("t", 6, 0, 0, 0, 0, 0L),
             new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L)
         );
 
-        var copy = new ArrayList<>(stats);
+        var copy = new ArrayList<>(unorderedStats);
         Collections.sort(copy);
 
-        var names = copy.stream().map(ThreadPoolStats.Stats::name).toList();
-        assertThat(names, contains("a", "d", "m", "m", "m", "t", "z"));
+        var threadPoolStats = new ThreadPoolStats(unorderedStats);
+        assertThat(threadPoolStats.stats().stream().map(ThreadPoolStats.Stats::name).toList(), contains("a", "d", "m", "m", "m", "t", "z"));
+        assertThat(threadPoolStats.stats().stream().map(ThreadPoolStats.Stats::threads).toList(), contains(-1, 2, -3, 4, 5, 6, 7));
+    }
+
+    public void testMergeThreadPoolStats() {
+        var firstStats = List.of(randomStats("name-1"), randomStats("name-2"), randomStats("name-3"));
+        var secondStats = List.of(randomStats("name-4"), randomStats("name-5"), randomStats("name-2"), randomStats("name-3"));
+
+        var tps1 = new ThreadPoolStats(firstStats);
+        var tps2 = new ThreadPoolStats(secondStats);
+        var target = ThreadPoolStats.merge(tps1, tps2);
+
+        assertThat(target.stats(), hasSize(5));
+        assertThat(
+            target.stats(),
+            containsInAnyOrder(
+                firstStats.get(0), // name-1
+                ThreadPoolStats.Stats.merge(firstStats.get(1), secondStats.get(2)), // name-2
+                ThreadPoolStats.Stats.merge(firstStats.get(2), secondStats.get(3)), // name-3
+                secondStats.get(0), // name-4
+                secondStats.get(1) // name-5
+            )
+        );
+    }
+
+    public void testStatsMerge() {
+        assertEquals(ThreadPoolStats.Stats.merge(stats(-1), stats(-1)), stats(-1));
+        assertEquals(ThreadPoolStats.Stats.merge(stats(1), stats(-1)), stats(1));
+        assertEquals(ThreadPoolStats.Stats.merge(stats(-1), stats(1)), stats(1));
+        assertEquals(ThreadPoolStats.Stats.merge(stats(1), stats(2)), stats(3));
+    }
 
-        var threads = copy.stream().map(ThreadPoolStats.Stats::threads).toList();
-        assertThat(threads, contains(-1, -1, 1, 2, 3, -1, -1));
+    private static ThreadPoolStats.Stats stats(int value) {
+        return new ThreadPoolStats.Stats("a", value, value, value, value, value, value);
     }
 
     public void testSerialization() throws IOException {
@@ -72,6 +104,6 @@ public class ThreadPoolStatsTests extends ESTestCase {
     }
 
     private static int randomMinusOneOrOther() {
-        return randomBoolean() ? -1 : randomIntBetween(0, Integer.MAX_VALUE);
+        return randomBoolean() ? -1 : randomIntBetween(0, 1000);
     }
 }