Browse Source

Push back excessive requests for stats (#83832)

Resolves #51992
Mary Gouseti 3 years ago
parent
commit
ed0bb2a8af
26 changed files with 674 additions and 54 deletions
  1. 6 0
      docs/changelog/83832.yaml
  2. 35 0
      docs/reference/cluster/nodes-stats.asciidoc
  3. 18 0
      docs/reference/modules/cluster/misc.asciidoc
  4. 43 37
      server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java
  5. 11 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java
  6. 22 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java
  7. 2 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java
  8. 13 2
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java
  9. 11 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java
  10. 11 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
  11. 10 1
      server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java
  12. 10 1
      server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java
  13. 10 1
      server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java
  14. 124 0
      server/src/main/java/org/elasticsearch/action/support/StatsRequestLimiter.java
  15. 139 0
      server/src/main/java/org/elasticsearch/action/support/StatsRequestStats.java
  16. 3 1
      server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
  17. 6 1
      server/src/main/java/org/elasticsearch/node/Node.java
  18. 9 3
      server/src/main/java/org/elasticsearch/node/NodeService.java
  19. 34 1
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java
  20. 145 0
      server/src/test/java/org/elasticsearch/action/support/StatsRequestLimiterTests.java
  21. 6 0
      server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java
  22. 2 1
      test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java
  23. 1 0
      test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
  24. 1 0
      x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/memory/AutoscalingMemoryInfoServiceTests.java
  25. 1 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java
  26. 1 0
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java

+ 6 - 0
docs/changelog/83832.yaml

@@ -0,0 +1,6 @@
+pr: 83832
+summary: Push back excessive stats requests
+area: Stats
+type: enhancement
+issues:
+ - 51992

+ 35 - 0
docs/reference/cluster/nodes-stats.asciidoc

@@ -91,6 +91,10 @@ using metrics.
   `transport`::
       Transport statistics about sent and received bytes in cluster
       communication.
+
+  `stats_requests`::
+      Statistics about stats requests such as indices stats, nodes stats,
+      recovery stats etc.
 --
 
 `<index_metric>`::
@@ -2633,6 +2637,37 @@ search requests on the keyed node.
 The rank of this node; used for shard selection when routing search
 requests.
 ======
+======
+
+[[cluster-nodes-stats-api-response-body-stats-requests]]
+`stats_requests`::
+(object)
+Contains statistics about the stats requests the node has received.
++
+.Properties of `stats_requests`
+[%collapsible%open]
+======
+`<stats_requests_name>`::
+(object)
+Contains statistics about a specific type of a stats request the node has received.
++
+.Properties of `<stats_requests_name>`
+[%collapsible%open]
+=======
+`current`::
+(integer)
+Number of stats requests currently in progress.
+
+`completed`::
+(integer)
+Number of stats requests that have been completed by the node (successfully or
+not).
+
+`rejected`::
+(integer)
+Number of stats requests that were rejected by the node because it had reached
+the limit of concurrent stats requests (`node.stats.max_concurrent_requests`).
+=======
 =====
 ====
 

+ 18 - 0
docs/reference/modules/cluster/misc.asciidoc

@@ -101,6 +101,24 @@ number of shards for each node, use the
 setting.
 --
 
+[[stats-requests-limit]]
+===== Stats request limit
+
+A stats request might require information from all nodes to be aggregated before it returns to the user.
+These requests can be heavy and they put extra pressure on the coordinating node (the node collecting the
+responses from all the nodes), for this reason there is a limit on the concurrent requests that a node can coordinate.
+
+--
+
+[[node-stats-max-concurrent-requests]]
+`node.stats.max_concurrent_requests`::
++
+--
+(<<dynamic-cluster-setting,Dynamic>>)
+Limits the stats requests a coordinating node can concurrently handle. Defaults to `100`.
+
+
+
 [[user-defined-data]]
 ===== User-defined cluster metadata
 

+ 43 - 37
server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java

@@ -79,6 +79,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.elasticsearch.action.support.StatsRequestLimiter.MAX_CONCURRENT_STATS_REQUESTS_PER_NODE;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
@@ -1386,52 +1387,57 @@ public class IndexStatsIT extends ESIntegTestCase {
         }
 
         // start threads that will get stats concurrently with indexing
-        for (int i = 0; i < numberOfStatsThreads; i++) {
-            final Thread thread = new Thread(() -> {
-                try {
-                    barrier.await();
-                } catch (final BrokenBarrierException | InterruptedException e) {
-                    failed.set(true);
-                    executionFailures.get().add(e);
-                    latch.countDown();
-                }
-                final IndicesStatsRequest request = new IndicesStatsRequest();
-                request.all();
-                request.indices(new String[0]);
-                while (stop.get() == false) {
+        try {
+            updateClusterSettings(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), numberOfStatsThreads + 1));
+            for (int i = 0; i < numberOfStatsThreads; i++) {
+                final Thread thread = new Thread(() -> {
                     try {
-                        final IndicesStatsResponse response = client().admin().indices().stats(request).get();
-                        if (response.getFailedShards() > 0) {
-                            failed.set(true);
-                            shardFailures.get().addAll(Arrays.asList(response.getShardFailures()));
-                            latch.countDown();
-                        }
-                    } catch (final ExecutionException | InterruptedException e) {
+                        barrier.await();
+                    } catch (final BrokenBarrierException | InterruptedException e) {
                         failed.set(true);
                         executionFailures.get().add(e);
                         latch.countDown();
                     }
-                }
-            });
-            thread.setName("stats-" + i);
-            threads.add(thread);
-            thread.start();
-        }
+                    final IndicesStatsRequest request = new IndicesStatsRequest();
+                    request.all();
+                    request.indices(new String[0]);
+                    while (stop.get() == false) {
+                        try {
+                            final IndicesStatsResponse response = client().admin().indices().stats(request).get();
+                            if (response.getFailedShards() > 0) {
+                                failed.set(true);
+                                shardFailures.get().addAll(Arrays.asList(response.getShardFailures()));
+                                latch.countDown();
+                            }
+                        } catch (final ExecutionException | InterruptedException e) {
+                            failed.set(true);
+                            executionFailures.get().add(e);
+                            latch.countDown();
+                        }
+                    }
+                });
+                thread.setName("stats-" + i);
+                threads.add(thread);
+                thread.start();
+            }
 
-        // release the hounds
-        barrier.await();
+            // release the hounds
+            barrier.await();
 
-        // wait for a failure, or for fifteen seconds to elapse
-        latch.await(15, TimeUnit.SECONDS);
+            // wait for a failure, or for fifteen seconds to elapse
+            latch.await(15, TimeUnit.SECONDS);
 
-        // stop all threads and wait for them to complete
-        stop.set(true);
-        for (final Thread thread : threads) {
-            thread.join();
-        }
+            // stop all threads and wait for them to complete
+            stop.set(true);
+            for (final Thread thread : threads) {
+                thread.join();
+            }
 
-        assertThat(shardFailures.get(), emptyCollectionOf(DefaultShardOperationFailedException.class));
-        assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
+            assertThat(shardFailures.get(), emptyCollectionOf(DefaultShardOperationFailedException.class));
+            assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
+        } finally {
+            updateClusterSettings(Settings.builder().putNull(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey()));
+        }
     }
 
     /**

+ 11 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java

@@ -8,8 +8,10 @@
 
 package org.elasticsearch.action.admin.cluster.node.info;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.StatsRequestLimiter;
 import org.elasticsearch.action.support.nodes.TransportNodesAction;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -33,6 +35,7 @@ public class TransportNodesInfoAction extends TransportNodesAction<
     NodeInfo> {
 
     private final NodeService nodeService;
+    private final StatsRequestLimiter statsRequestLimiter;
 
     @Inject
     public TransportNodesInfoAction(
@@ -40,7 +43,8 @@ public class TransportNodesInfoAction extends TransportNodesAction<
         ClusterService clusterService,
         TransportService transportService,
         NodeService nodeService,
-        ActionFilters actionFilters
+        ActionFilters actionFilters,
+        StatsRequestLimiter statsRequestLimiter
     ) {
         super(
             NodesInfoAction.NAME,
@@ -54,6 +58,7 @@ public class TransportNodesInfoAction extends TransportNodesAction<
             NodeInfo.class
         );
         this.nodeService = nodeService;
+        this.statsRequestLimiter = statsRequestLimiter;
     }
 
     @Override
@@ -94,6 +99,11 @@ public class TransportNodesInfoAction extends TransportNodesAction<
         );
     }
 
+    @Override
+    protected void doExecute(Task task, NodesInfoRequest request, ActionListener<NodesInfoResponse> listener) {
+        statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute);
+    }
+
     public static class NodeInfoRequest extends TransportRequest {
 
         NodesInfoRequest request;

+ 22 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java

@@ -8,6 +8,8 @@
 
 package org.elasticsearch.action.admin.cluster.node.stats;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.action.support.StatsRequestStats;
 import org.elasticsearch.action.support.nodes.BaseNodeResponse;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -87,6 +89,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
     @Nullable
     private IndexingPressureStats indexingPressureStats;
 
+    @Nullable
+    private StatsRequestStats statsRequestStats;
+
     public NodeStats(StreamInput in) throws IOException {
         super(in);
         timestamp = in.readVLong();
@@ -107,6 +112,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
         ingestStats = in.readOptionalWriteable(IngestStats::new);
         adaptiveSelectionStats = in.readOptionalWriteable(AdaptiveSelectionStats::new);
         indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new);
+        if (in.getVersion().onOrAfter(Version.V_8_2_0)) {
+            statsRequestStats = in.readOptionalWriteable(StatsRequestStats::new);
+        }
     }
 
     public NodeStats(
@@ -126,7 +134,8 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
         @Nullable IngestStats ingestStats,
         @Nullable AdaptiveSelectionStats adaptiveSelectionStats,
         @Nullable ScriptCacheStats scriptCacheStats,
-        @Nullable IndexingPressureStats indexingPressureStats
+        @Nullable IndexingPressureStats indexingPressureStats,
+        @Nullable StatsRequestStats statsRequestStats
     ) {
         super(node);
         this.timestamp = timestamp;
@@ -145,6 +154,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
         this.adaptiveSelectionStats = adaptiveSelectionStats;
         this.scriptCacheStats = scriptCacheStats;
         this.indexingPressureStats = indexingPressureStats;
+        this.statsRequestStats = statsRequestStats;
     }
 
     public long getTimestamp() {
@@ -249,6 +259,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
         return indexingPressureStats;
     }
 
+    @Nullable
+    public StatsRequestStats getStatsRequestStats() {
+        return statsRequestStats;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
@@ -272,6 +287,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
         out.writeOptionalWriteable(ingestStats);
         out.writeOptionalWriteable(adaptiveSelectionStats);
         out.writeOptionalWriteable(indexingPressureStats);
+        if (out.getVersion().onOrAfter(Version.V_8_2_0)) {
+            out.writeOptionalWriteable(statsRequestStats);
+        }
     }
 
     @Override
@@ -341,6 +359,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
         if (getIndexingPressureStats() != null) {
             getIndexingPressureStats().toXContent(builder, params);
         }
+        if (getStatsRequestStats() != null) {
+            getStatsRequestStats().toXContent(builder, params);
+        }
         return builder;
     }
 }

+ 2 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java

@@ -189,7 +189,8 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         INGEST("ingest"),
         ADAPTIVE_SELECTION("adaptive_selection"),
         SCRIPT_CACHE("script_cache"),
-        INDEXING_PRESSURE("indexing_pressure"),;
+        INDEXING_PRESSURE("indexing_pressure"),
+        STATS_REQUESTS("stats_requests"),;
 
         private String metricName;
 

+ 13 - 2
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

@@ -8,8 +8,10 @@
 
 package org.elasticsearch.action.admin.cluster.node.stats;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.StatsRequestLimiter;
 import org.elasticsearch.action.support.nodes.TransportNodesAction;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -36,6 +38,7 @@ public class TransportNodesStatsAction extends TransportNodesAction<
     NodeStats> {
 
     private final NodeService nodeService;
+    private final StatsRequestLimiter statsRequestLimiter;
 
     @Inject
     public TransportNodesStatsAction(
@@ -43,7 +46,8 @@ public class TransportNodesStatsAction extends TransportNodesAction<
         ClusterService clusterService,
         TransportService transportService,
         NodeService nodeService,
-        ActionFilters actionFilters
+        ActionFilters actionFilters,
+        StatsRequestLimiter statsRequestLimiter
     ) {
         super(
             NodesStatsAction.NAME,
@@ -57,6 +61,7 @@ public class TransportNodesStatsAction extends TransportNodesAction<
             NodeStats.class
         );
         this.nodeService = nodeService;
+        this.statsRequestLimiter = statsRequestLimiter;
     }
 
     @Override
@@ -95,10 +100,16 @@ public class TransportNodesStatsAction extends TransportNodesAction<
             NodesStatsRequest.Metric.INGEST.containedIn(metrics),
             NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
             NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
-            NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics)
+            NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
+            NodesStatsRequest.Metric.STATS_REQUESTS.containedIn(metrics)
         );
     }
 
+    @Override
+    protected void doExecute(Task task, NodesStatsRequest request, ActionListener<NodesStatsResponse> listener) {
+        statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute);
+    }
+
     public static class NodeStatsRequest extends TransportRequest {
 
         NodesStatsRequest request;

+ 11 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java

@@ -8,8 +8,10 @@
 
 package org.elasticsearch.action.admin.cluster.node.usage;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.StatsRequestLimiter;
 import org.elasticsearch.action.support.nodes.TransportNodesAction;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -36,6 +38,7 @@ public class TransportNodesUsageAction extends TransportNodesAction<
     private final UsageService restUsageService;
     private final AggregationUsageService aggregationUsageService;
     private final long sinceTime;
+    private final StatsRequestLimiter statsRequestLimiter;
 
     @Inject
     public TransportNodesUsageAction(
@@ -44,7 +47,8 @@ public class TransportNodesUsageAction extends TransportNodesAction<
         TransportService transportService,
         ActionFilters actionFilters,
         UsageService restUsageService,
-        AggregationUsageService aggregationUsageService
+        AggregationUsageService aggregationUsageService,
+        StatsRequestLimiter statsRequestLimiter
     ) {
         super(
             NodesUsageAction.NAME,
@@ -59,6 +63,7 @@ public class TransportNodesUsageAction extends TransportNodesAction<
         );
         this.restUsageService = restUsageService;
         this.aggregationUsageService = aggregationUsageService;
+        this.statsRequestLimiter = statsRequestLimiter;
         this.sinceTime = System.currentTimeMillis();
     }
 
@@ -85,6 +90,11 @@ public class TransportNodesUsageAction extends TransportNodesAction<
         return new NodeUsage(clusterService.localNode(), System.currentTimeMillis(), sinceTime, restUsage, aggsUsage);
     }
 
+    @Override
+    protected void doExecute(Task task, NodesUsageRequest request, ActionListener<NodesUsageResponse> listener) {
+        statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute);
+    }
+
     public static class NodeUsageRequest extends TransportRequest {
 
         NodesUsageRequest request;

+ 11 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

@@ -18,6 +18,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats;
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.StatsRequestLimiter;
 import org.elasticsearch.action.support.nodes.TransportNodesAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
@@ -72,6 +73,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<
 
     private final MetadataStatsCache<MappingStats> mappingStatsCache;
     private final MetadataStatsCache<AnalysisStats> analysisStatsCache;
+    private final StatsRequestLimiter statsRequestLimiter;
 
     @Inject
     public TransportClusterStatsAction(
@@ -80,7 +82,8 @@ public class TransportClusterStatsAction extends TransportNodesAction<
         TransportService transportService,
         NodeService nodeService,
         IndicesService indicesService,
-        ActionFilters actionFilters
+        ActionFilters actionFilters,
+        StatsRequestLimiter statsRequestLimiter
     ) {
         super(
             ClusterStatsAction.NAME,
@@ -98,6 +101,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<
         this.indicesService = indicesService;
         this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of);
         this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of);
+        this.statsRequestLimiter = statsRequestLimiter;
     }
 
     @Override
@@ -183,6 +187,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<
             true,
             false,
             false,
+            false,
             false
         );
         List<ShardStats> shardsStats = new ArrayList<>();
@@ -233,6 +238,11 @@ public class TransportClusterStatsAction extends TransportNodesAction<
 
     }
 
+    @Override
+    protected void doExecute(Task task, ClusterStatsRequest request, ActionListener<ClusterStatsResponse> listener) {
+        statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute);
+    }
+
     public static class ClusterStatsNodeRequest extends TransportRequest {
 
         ClusterStatsRequest request;

+ 10 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java

@@ -11,6 +11,7 @@ package org.elasticsearch.action.admin.indices.recovery;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
+import org.elasticsearch.action.support.StatsRequestLimiter;
 import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -44,6 +45,7 @@ import java.util.Map;
 public class TransportRecoveryAction extends TransportBroadcastByNodeAction<RecoveryRequest, RecoveryResponse, RecoveryState> {
 
     private final IndicesService indicesService;
+    private final StatsRequestLimiter statsRequestLimiter;
 
     @Inject
     public TransportRecoveryAction(
@@ -51,7 +53,8 @@ public class TransportRecoveryAction extends TransportBroadcastByNodeAction<Reco
         TransportService transportService,
         IndicesService indicesService,
         ActionFilters actionFilters,
-        IndexNameExpressionResolver indexNameExpressionResolver
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        StatsRequestLimiter statsRequestLimiter
     ) {
         super(
             RecoveryAction.NAME,
@@ -63,6 +66,7 @@ public class TransportRecoveryAction extends TransportBroadcastByNodeAction<Reco
             ThreadPool.Names.MANAGEMENT
         );
         this.indicesService = indicesService;
+        this.statsRequestLimiter = statsRequestLimiter;
     }
 
     @Override
@@ -131,6 +135,11 @@ public class TransportRecoveryAction extends TransportBroadcastByNodeAction<Reco
         return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
     }
 
+    @Override
+    protected void doExecute(Task task, RecoveryRequest request, ActionListener<RecoveryResponse> listener) {
+        statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute);
+    }
+
     @Nullable // unless running tests that inject extra behaviour
     private volatile Runnable onShardOperation;
 

+ 10 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java

@@ -11,6 +11,7 @@ package org.elasticsearch.action.admin.indices.segments;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
+import org.elasticsearch.action.support.StatsRequestLimiter;
 import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -38,6 +39,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeActi
     ShardSegments> {
 
     private final IndicesService indicesService;
+    private final StatsRequestLimiter statsRequestLimiter;
 
     @Inject
     public TransportIndicesSegmentsAction(
@@ -45,7 +47,8 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeActi
         TransportService transportService,
         IndicesService indicesService,
         ActionFilters actionFilters,
-        IndexNameExpressionResolver indexNameExpressionResolver
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        StatsRequestLimiter statsRequestLimiter
     ) {
         super(
             IndicesSegmentsAction.NAME,
@@ -57,6 +60,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeActi
             ThreadPool.Names.MANAGEMENT
         );
         this.indicesService = indicesService;
+        this.statsRequestLimiter = statsRequestLimiter;
     }
 
     /**
@@ -120,4 +124,9 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeActi
             return new ShardSegments(indexShard.routingEntry(), indexShard.segments());
         });
     }
+
+    @Override
+    protected void doExecute(Task task, IndicesSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
+        statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute);
+    }
 }

+ 10 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java

@@ -12,6 +12,7 @@ import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
+import org.elasticsearch.action.support.StatsRequestLimiter;
 import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -40,6 +41,7 @@ import java.util.List;
 public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<IndicesStatsRequest, IndicesStatsResponse, ShardStats> {
 
     private final IndicesService indicesService;
+    private final StatsRequestLimiter statsRequestLimiter;
 
     @Inject
     public TransportIndicesStatsAction(
@@ -47,7 +49,8 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
         TransportService transportService,
         IndicesService indicesService,
         ActionFilters actionFilters,
-        IndexNameExpressionResolver indexNameExpressionResolver
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        StatsRequestLimiter statsRequestLimiter
     ) {
         super(
             IndicesStatsAction.NAME,
@@ -59,6 +62,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
             ThreadPool.Names.MANAGEMENT
         );
         this.indicesService = indicesService;
+        this.statsRequestLimiter = statsRequestLimiter;
     }
 
     /**
@@ -144,4 +148,9 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
             );
         });
     }
+
+    @Override
+    protected void doExecute(Task task, IndicesStatsRequest request, ActionListener<IndicesStatsResponse> listener) {
+        statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute);
+    }
 }

+ 124 - 0
server/src/main/java/org/elasticsearch/action/support/StatsRequestLimiter.java

@@ -0,0 +1,124 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.TriConsumer;
+import org.elasticsearch.common.metrics.CounterMetric;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.AdjustableSemaphore;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.common.util.concurrent.RunOnce;
+import org.elasticsearch.tasks.Task;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class guards the amount of stats requests a node can concurrently coordinate.
+ */
+public class StatsRequestLimiter {
+
+    public static final Setting<Integer> MAX_CONCURRENT_STATS_REQUESTS_PER_NODE = Setting.intSetting(
+        "node.stats.max_concurrent_requests",
+        100,
+        1,
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+
+    private final AdjustableSemaphore maxConcurrentStatsRequestsPerNodeSemaphore;
+    private volatile int maxConcurrentStatsRequestsPerNode;
+    private final Map<String, StatsHolder> stats = new ConcurrentHashMap<>();
+
+    public StatsRequestLimiter(Settings settings, ClusterSettings clusterSettings) {
+        maxConcurrentStatsRequestsPerNode = MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.get(settings);
+        this.maxConcurrentStatsRequestsPerNodeSemaphore = new AdjustableSemaphore(maxConcurrentStatsRequestsPerNode, false);
+        clusterSettings.addSettingsUpdateConsumer(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE, this::setMaxConcurrentStatsRequestsPerNode);
+    }
+
+    private void setMaxConcurrentStatsRequestsPerNode(int maxConcurrentStatsRequestsPerNode) {
+        this.maxConcurrentStatsRequestsPerNode = maxConcurrentStatsRequestsPerNode;
+        this.maxConcurrentStatsRequestsPerNodeSemaphore.setMaxPermits(maxConcurrentStatsRequestsPerNode);
+    }
+
+    /**
+     * Checks if executing the action will remain within the limits of the max concurrent requests the node can handle. If the limit is
+     * respected the action will be executed otherwise it will throw an EsRejectedExecutionException. The method keeps track of current,
+     * completed and rejected requests per action type.
+     */
+    public <Request, Response> void tryToExecute(
+        Task task,
+        Request request,
+        ActionListener<Response> listener,
+        TriConsumer<Task, Request, ActionListener<Response>> executeAction
+    ) {
+        StatsHolder statsHolder = stats.computeIfAbsent(task.getAction(), ignored -> new StatsHolder(task.getAction()));
+        if (tryAcquire()) {
+            statsHolder.current.inc();
+            final Runnable release = new RunOnce(() -> {
+                release();
+                statsHolder.current.dec();
+                statsHolder.completed.inc();
+            });
+            boolean success = false;
+            try {
+                executeAction.apply(task, request, ActionListener.runBefore(listener, release::run));
+                success = true;
+            } finally {
+                if (success == false) {
+                    release.run();
+                }
+            }
+        } else {
+            listener.onFailure(
+                new EsRejectedExecutionException(
+                    "this node is already coordinating ["
+                        + maxConcurrentStatsRequestsPerNode
+                        + "] stats requests and has reached the limit set by ["
+                        + MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey()
+                        + "]"
+                )
+            );
+            statsHolder.rejected.inc();
+        }
+    }
+
+    public StatsRequestStats stats() {
+        return new StatsRequestStats(stats.values().stream().map(StatsHolder::stats).collect(Collectors.toList()));
+    }
+
+    // visible for testing
+    boolean tryAcquire() {
+        return maxConcurrentStatsRequestsPerNodeSemaphore.tryAcquire();
+    }
+
+    // visible for testing
+    void release() {
+        maxConcurrentStatsRequestsPerNodeSemaphore.release();
+    }
+
+    static final class StatsHolder {
+        String request;
+        final CounterMetric current = new CounterMetric();
+        final CounterMetric completed = new CounterMetric();
+        final CounterMetric rejected = new CounterMetric();
+
+        StatsHolder(String request) {
+            this.request = request;
+        }
+
+        StatsRequestStats.Stats stats() {
+            return new StatsRequestStats.Stats(request, current.count(), completed.count(), rejected.count());
+        }
+    }
+}

+ 139 - 0
server/src/main/java/org/elasticsearch/action/support/StatsRequestStats.java

@@ -0,0 +1,139 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.xcontent.ToXContentFragment;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class StatsRequestStats implements Writeable, ToXContentFragment, Iterable<StatsRequestStats.Stats> {
+
+    public static class Stats implements Writeable, ToXContentFragment, Comparable<Stats> {
+
+        private final String request;
+        private final long current;
+        private final long completed;
+        private final long rejected;
+
+        public Stats(String request, long current, long completed, long rejected) {
+            this.request = request;
+            this.current = current;
+            this.completed = completed;
+            this.rejected = rejected;
+        }
+
+        public Stats(StreamInput in) throws IOException {
+            request = in.readString();
+            current = in.readLong();
+            completed = in.readLong();
+            rejected = in.readLong();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeString(request);
+            out.writeLong(current);
+            out.writeLong(completed);
+            out.writeLong(rejected);
+        }
+
+        public String getRequest() {
+            return this.request;
+        }
+
+        public long getCurrent() {
+            return this.current;
+        }
+
+        public long getCompleted() {
+            return this.completed;
+        }
+
+        public long getRejected() {
+            return rejected;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject(request);
+            if (current != -1) {
+                builder.field(Fields.CURRENT, current);
+            }
+            if (completed != -1) {
+                builder.field(Fields.COMPLETED, completed);
+            }
+            if (rejected != -1) {
+                builder.field(Fields.REJECTED, rejected);
+            }
+            builder.endObject();
+            return builder;
+        }
+
+        @Override
+        public int compareTo(Stats other) {
+            if ((getRequest() == null) && (other.getRequest() == null)) {
+                return 0;
+            } else if ((getRequest() != null) && (other.getRequest() == null)) {
+                return 1;
+            } else if (getRequest() == null) {
+                return -1;
+            } else {
+                int compare = getRequest().compareTo(other.getRequest());
+                if (compare == 0) {
+                    compare = Long.compare(getCompleted(), other.getCompleted());
+                }
+                return compare;
+            }
+        }
+    }
+
+    private List<Stats> stats;
+
+    public StatsRequestStats(List<Stats> stats) {
+        Collections.sort(stats);
+        this.stats = stats;
+    }
+
+    public StatsRequestStats(StreamInput in) throws IOException {
+        stats = in.readList(Stats::new);
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeList(stats);
+    }
+
+    @Override
+    public Iterator<Stats> iterator() {
+        return stats.iterator();
+    }
+
+    static final class Fields {
+        static final String CURRENT = "current";
+        static final String COMPLETED = "completed";
+        static final String REJECTED = "rejected";
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject("stats_requests");
+        for (Stats stat : stats) {
+            stat.toXContent(builder, params);
+        }
+        builder.endObject();
+        return builder;
+    }
+}

+ 3 - 1
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
 import org.elasticsearch.action.search.TransportSearchAction;
 import org.elasticsearch.action.support.AutoCreateIndex;
 import org.elasticsearch.action.support.DestructiveOperations;
+import org.elasticsearch.action.support.StatsRequestLimiter;
 import org.elasticsearch.action.support.replication.TransportReplicationAction;
 import org.elasticsearch.bootstrap.BootstrapSettings;
 import org.elasticsearch.client.internal.Client;
@@ -508,7 +509,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
         FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
         IndexingPressure.MAX_INDEXING_BYTES,
         ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN,
-        DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING
+        DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING,
+        StatsRequestLimiter.MAX_CONCURRENT_STATS_REQUESTS_PER_NODE
     );
 
     static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();

+ 6 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.search.SearchExecutionStatsCollector;
 import org.elasticsearch.action.search.SearchPhaseController;
 import org.elasticsearch.action.search.SearchTransportService;
+import org.elasticsearch.action.support.StatsRequestLimiter;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.action.update.UpdateHelper;
 import org.elasticsearch.bootstrap.BootstrapCheck;
@@ -809,6 +810,8 @@ public class Node implements Closeable {
             );
             clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
 
+            final StatsRequestLimiter statsRequestLimiter = new StatsRequestLimiter(settings, settingsModule.getClusterSettings());
+
             final DiscoveryModule discoveryModule = new DiscoveryModule(
                 settings,
                 transportService,
@@ -842,7 +845,8 @@ public class Node implements Closeable {
                 responseCollectorService,
                 searchTransportService,
                 indexingLimits,
-                searchModule.getValuesSourceRegistry().getUsageService()
+                searchModule.getValuesSourceRegistry().getUsageService(),
+                statsRequestLimiter
             );
 
             final SearchService searchService = newSearchService(
@@ -983,6 +987,7 @@ public class Node implements Closeable {
                 b.bind(IndexSettingProviders.class).toInstance(indexSettingProviders);
                 b.bind(DesiredNodesSettingsValidator.class).toInstance(desiredNodesSettingsValidator);
                 b.bind(HealthService.class).toInstance(healthService);
+                b.bind(StatsRequestLimiter.class).toInstance(statsRequestLimiter);
             });
             injector = modules.createInjector();
 

+ 9 - 3
server/src/main/java/org/elasticsearch/node/NodeService.java

@@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
 import org.elasticsearch.action.search.SearchTransportService;
+import org.elasticsearch.action.support.StatsRequestLimiter;
 import org.elasticsearch.cluster.coordination.Coordinator;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
@@ -52,6 +53,7 @@ public class NodeService implements Closeable {
     private final SearchTransportService searchTransportService;
     private final IndexingPressure indexingPressure;
     private final AggregationUsageService aggregationUsageService;
+    private final StatsRequestLimiter statsRequestLimiter;
 
     private final Coordinator coordinator;
 
@@ -72,7 +74,8 @@ public class NodeService implements Closeable {
         ResponseCollectorService responseCollectorService,
         SearchTransportService searchTransportService,
         IndexingPressure indexingPressure,
-        AggregationUsageService aggregationUsageService
+        AggregationUsageService aggregationUsageService,
+        StatsRequestLimiter statsRequestLimiter
     ) {
         this.settings = settings;
         this.threadPool = threadPool;
@@ -90,6 +93,7 @@ public class NodeService implements Closeable {
         this.searchTransportService = searchTransportService;
         this.indexingPressure = indexingPressure;
         this.aggregationUsageService = aggregationUsageService;
+        this.statsRequestLimiter = statsRequestLimiter;
         clusterService.addStateApplier(ingestService);
     }
 
@@ -139,7 +143,8 @@ public class NodeService implements Closeable {
         boolean ingest,
         boolean adaptiveSelection,
         boolean scriptCache,
-        boolean indexingPressure
+        boolean indexingPressure,
+        boolean statsRequests
     ) {
         // for indices stats we want to include previous allocated shards stats as well (it will
         // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
@@ -160,7 +165,8 @@ public class NodeService implements Closeable {
             ingest ? ingestService.stats() : null,
             adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
             scriptCache ? scriptService.cacheStats() : null,
-            indexingPressure ? this.indexingPressure.stats() : null
+            indexingPressure ? this.indexingPressure.stats() : null,
+            statsRequests ? this.statsRequestLimiter.stats() : null
         );
     }
 

+ 34 - 1
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.action.admin.cluster.node.stats;
 
+import org.elasticsearch.action.support.StatsRequestStats;
 import org.elasticsearch.cluster.coordination.ClusterStateSerializationStats;
 import org.elasticsearch.cluster.coordination.PendingClusterStateStats;
 import org.elasticsearch.cluster.coordination.PublishClusterStateStats;
@@ -509,6 +510,21 @@ public class NodeStatsTests extends ESTestCase {
                     assertEquals(limited, sum.getCompilationLimitTriggered());
                     assertEquals(compilations, sum.getCompilations());
                 }
+                if (nodeStats.getStatsRequestStats() == null) {
+                    assertNull(deserializedNodeStats.getStatsRequestStats());
+                } else {
+                    Iterator<StatsRequestStats.Stats> statsRequestsStatsIterator = nodeStats.getStatsRequestStats().iterator();
+                    Iterator<StatsRequestStats.Stats> deserializedStatsRequestsStatsIterator = deserializedNodeStats.getStatsRequestStats()
+                        .iterator();
+                    while (statsRequestsStatsIterator.hasNext()) {
+                        StatsRequestStats.Stats stats = statsRequestsStatsIterator.next();
+                        StatsRequestStats.Stats deserializedStats = deserializedStatsRequestsStatsIterator.next();
+                        assertEquals(stats.getRequest(), deserializedStats.getRequest());
+                        assertEquals(stats.getCurrent(), deserializedStats.getCurrent());
+                        assertEquals(stats.getCompleted(), deserializedStats.getCompleted());
+                        assertEquals(stats.getRejected(), deserializedStats.getRejected());
+                    }
+                }
             }
         }
     }
@@ -885,6 +901,22 @@ public class NodeStatsTests extends ESTestCase {
                 randomLongBetween(0, maxStatValue)
             );
         }
+        StatsRequestStats statsRequestStats = null;
+        if (frequently()) {
+            int numStatsRequestsStats = randomIntBetween(0, 10);
+            List<StatsRequestStats.Stats> statsRequestsStatsList = new ArrayList<>();
+            for (int i = 0; i < numStatsRequestsStats; i++) {
+                statsRequestsStatsList.add(
+                    new StatsRequestStats.Stats(
+                        randomAlphaOfLengthBetween(3, 10),
+                        randomIntBetween(1, 10),
+                        randomIntBetween(1, 1000),
+                        randomIntBetween(1, 1000)
+                    )
+                );
+            }
+            statsRequestStats = new StatsRequestStats(statsRequestsStatsList);
+        }
         // TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet
         return new NodeStats(
             node,
@@ -903,7 +935,8 @@ public class NodeStatsTests extends ESTestCase {
             ingestStats,
             adaptiveSelectionStats,
             scriptCacheStats,
-            indexingPressureStats
+            indexingPressureStats,
+            statsRequestStats
         );
     }
 

+ 145 - 0
server/src/test/java/org/elasticsearch/action/support/StatsRequestLimiterTests.java

@@ -0,0 +1,145 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.TriConsumer;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+
+import static java.util.Collections.emptyMap;
+import static org.elasticsearch.action.support.StatsRequestLimiter.MAX_CONCURRENT_STATS_REQUESTS_PER_NODE;
+
+public class StatsRequestLimiterTests extends ESTestCase {
+
+    public void testGrantsPermitsUpToMaxPermits() throws Exception {
+        final int maxPermits = randomIntBetween(1, 5);
+        final List<Thread> threads = new ArrayList<>(maxPermits);
+        final CyclicBarrier barrier = new CyclicBarrier(1 + maxPermits);
+        TriConsumer<Task, Integer, ActionListener<Integer>> execute = (task, i, actionListener) -> {
+            final Thread thread = new Thread(() -> {
+                try {
+                    barrier.await();
+                } catch (final BrokenBarrierException | InterruptedException e) {
+                    fail("Exception occurred while waiting for the barrier to be lifted");
+                }
+                actionListener.onResponse(i);
+            });
+            thread.setName("thread-" + i);
+            threads.add(thread);
+            thread.start();
+        };
+        ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        Settings settings = Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), maxPermits).build();
+        StatsRequestLimiter statsRequestLimiter = new StatsRequestLimiter(settings, clusterSettings);
+
+        for (int i = 0; i < maxPermits; i++) {
+            PlainActionFuture<Integer> listener = new PlainActionFuture<>();
+            statsRequestLimiter.tryToExecute(createTask(), i, listener, execute);
+        }
+        PlainActionFuture<Integer> listener = new PlainActionFuture<>();
+        statsRequestLimiter.tryToExecute(createTask(), maxPermits, listener, execute);
+        String expectedExceptionMessage = "this node is already coordinating ["
+            + MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.get(settings)
+            + "] stats requests and has reached the limit set by ["
+            + MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey()
+            + "]";
+        expectThrows(EsRejectedExecutionException.class, expectedExceptionMessage, listener::actionGet);
+        StatsRequestStats.Stats stats = getStats(statsRequestLimiter);
+        assertEquals(maxPermits, stats.getCurrent());
+
+        barrier.await();
+        for (Thread thread : threads) {
+            thread.join();
+        }
+        assertBusy(() -> assertTrue(statsRequestLimiter.tryAcquire()));
+        stats = getStats(statsRequestLimiter);
+        assertEquals(0, stats.getCurrent());
+        assertEquals(maxPermits, stats.getCompleted());
+        assertEquals(1, stats.getRejected());
+    }
+
+    public void testStatsRequestPermitCanBeDynamicallyUpdated() {
+        ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        StatsRequestLimiter statsRequestLimiter = new StatsRequestLimiter(
+            Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 1).build(),
+            clusterSettings
+        );
+
+        assertTrue(statsRequestLimiter.tryAcquire());
+        assertFalse(statsRequestLimiter.tryAcquire());
+
+        clusterSettings.applySettings(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 2).build());
+
+        assertTrue(statsRequestLimiter.tryAcquire());
+
+        clusterSettings.applySettings(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 1).build());
+
+        assertFalse(statsRequestLimiter.tryAcquire());
+        statsRequestLimiter.release();
+        statsRequestLimiter.release();
+
+        assertTrue(statsRequestLimiter.tryAcquire());
+        assertFalse(statsRequestLimiter.tryAcquire());
+    }
+
+    public void testMaxConcurrentStatsRequestsPerNodeIsValidated() {
+        ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        Settings invalidSetting = Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 0).build();
+        expectThrows(IllegalArgumentException.class, () -> new StatsRequestLimiter(invalidSetting, clusterSettings));
+        new StatsRequestLimiter(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 1).build(), clusterSettings);
+        expectThrows(
+            IllegalArgumentException.class,
+            () -> clusterSettings.applySettings(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 0).build())
+        );
+    }
+
+    public void testReleasingAfterException() {
+        ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        StatsRequestLimiter statsRequestLimiter = new StatsRequestLimiter(
+            Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 1).build(),
+            clusterSettings
+        );
+        PlainActionFuture<Integer> listener = new PlainActionFuture<>();
+        TriConsumer<Task, Integer, ActionListener<Integer>> execute = (task, input, actionListener) -> {
+            // Verify that we hold the last permit
+            assertFalse(statsRequestLimiter.tryAcquire());
+            throw new RuntimeException("simulated");
+        };
+        expectThrows(RuntimeException.class, () -> statsRequestLimiter.tryToExecute(createTask(), 10, listener, execute));
+        StatsRequestStats.Stats stats = getStats(statsRequestLimiter);
+        assertEquals(0, stats.getCurrent());
+        assertEquals(1, stats.getCompleted());
+        assertTrue(statsRequestLimiter.tryAcquire());
+    }
+
+    private StatsRequestStats.Stats getStats(StatsRequestLimiter statsRequestLimiter) {
+        return statsRequestLimiter.stats().iterator().next();
+    }
+
+    private Task createTask() {
+        return new Task(
+            randomLong(),
+            "transport",
+            "stats_action",
+            "description",
+            new TaskId(randomLong() + ":" + randomLong()),
+            emptyMap()
+        );
+    }
+}

+ 6 - 0
server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java

@@ -170,6 +170,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             ),
             new NodeStats(
@@ -189,6 +190,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             ),
             new NodeStats(
@@ -208,6 +210,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             )
         );
@@ -258,6 +261,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             ),
             new NodeStats(
@@ -277,6 +281,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             ),
             new NodeStats(
@@ -296,6 +301,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             )
         );

+ 2 - 1
test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java

@@ -89,7 +89,8 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
                 nodeStats.getIngestStats(),
                 nodeStats.getAdaptiveSelectionStats(),
                 nodeStats.getScriptCacheStats(),
-                nodeStats.getIndexingPressureStats()
+                nodeStats.getIndexingPressureStats(),
+                nodeStats.getStatsRequestStats()
             );
         }).collect(Collectors.toList());
     }

+ 1 - 0
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -2458,6 +2458,7 @@ public final class InternalTestCluster extends TestCluster {
                     false,
                     false,
                     false,
+                    false,
                     false
                 );
                 assertThat(

+ 1 - 0
x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/memory/AutoscalingMemoryInfoServiceTests.java

@@ -368,6 +368,7 @@ public class AutoscalingMemoryInfoServiceTests extends AutoscalingTestCase {
             null,
             null,
             null,
+            null,
             null
         );
     }

+ 1 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java

@@ -358,6 +358,7 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
             ingestStats,
             null,
             null,
+            null,
             null
         );
 

+ 1 - 0
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java

@@ -445,6 +445,7 @@ public class NodeStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCa
             null,
             null,
             null,
+            null,
             null
         );
     }