Browse Source

Add repo throttle metrics to node stats api response (#96678)

* Add repo throttle metrics to node stats api response

* Update docs/changelog/96678.yaml

* Change x-content output structure

* Fix test after merge from main

* Follow PR comments

* minor fixes

* minor fixes 2

* Introduce new TransportVersion (V_8_500_010)

* Fix yaml test

* Follow PR comments

* Make stats datapoints human readable

* Follow common pattern for human readable output

* Bump up TransportVersion
Volodymyr Krasnikov 2 years ago
parent
commit
7abe8cb974
25 changed files with 361 additions and 20 deletions
  1. 6 0
      docs/changelog/96678.yaml
  2. 28 0
      docs/reference/cluster/nodes-stats.asciidoc
  3. 61 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/70_repository_throttling_stats.yml
  4. 78 0
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryThrottlingStatsIT.java
  5. 2 1
      server/src/main/java/org/elasticsearch/TransportVersion.java
  6. 24 8
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java
  7. 2 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java
  8. 5 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java
  9. 2 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java
  10. 1 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
  11. 1 0
      server/src/main/java/org/elasticsearch/health/node/LocalHealthMonitor.java
  12. 2 1
      server/src/main/java/org/elasticsearch/node/Node.java
  13. 9 4
      server/src/main/java/org/elasticsearch/node/NodeService.java
  14. 14 0
      server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java
  15. 84 0
      server/src/main/java/org/elasticsearch/repositories/RepositoriesStats.java
  16. 15 3
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java
  17. 6 0
      server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java
  18. 5 0
      server/src/test/java/org/elasticsearch/health/node/LocalHealthMonitorTests.java
  19. 9 0
      server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
  20. 1 0
      server/src/test/java/org/elasticsearch/rest/action/info/RestClusterInfoActionTests.java
  21. 2 1
      test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java
  22. 1 0
      test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
  23. 1 0
      x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java
  24. 1 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java
  25. 1 0
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java

+ 6 - 0
docs/changelog/96678.yaml

@@ -0,0 +1,6 @@
+pr: 96678
+summary: Add repo throttle metrics to node stats api response
+area: Snapshot/Restore
+type: feature
+issues:
+ - 89385

+ 28 - 0
docs/reference/cluster/nodes-stats.asciidoc

@@ -84,6 +84,9 @@ using metrics.
       Process statistics, memory consumption, cpu usage, open
       file descriptors.
 
+  `repositories`::
+      Statistics about snapshot repositories.
+
   `thread_pool`::
       Statistics about each thread pool, including current size, queue and
       rejected tasks.
@@ -1672,6 +1675,31 @@ Total number of classes unloaded since the JVM started.
 =======
 ======
 
+[[cluster-nodes-stats-api-response-body-repositories]]
+`repositories`::
+(object)
+Statistics about snapshot repositories.
++
+.Properties of `repositories`
+[%collapsible%open]
+======
+`<repository_name>`::
+(object)
+Contains repository throttling statistics for the node.
++
+.Properties of `<repository_name>`
+[%collapsible%open]
+=======
+`total_read_throttled_time_nanos`::
+(integer)
+Total number of nanos which node had to wait during recovery.
+
+`total_write_throttled_time_nanos`::
+(integer)
+Total number of nanos which node had to wait during snapshotting.
+=======
+======
+
 [[cluster-nodes-stats-api-response-body-threadpool]]
 `thread_pool`::
 (object)

+ 61 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/70_repository_throttling_stats.yml

@@ -0,0 +1,61 @@
+---
+"Repository throttling stats (no repository exists)":
+  - skip:
+      version: " - 8.8.99"
+      reason: "repository throttling stats was added in 8.9.0"
+      features: [arbitrary_key]
+
+  - do:
+      nodes.info: {}
+  - set:
+      nodes._arbitrary_key_: node_id
+
+  - do:
+      nodes.stats:
+        metric: [ repository ]
+
+  - is_true: nodes.$node_id.repositories
+  - match: { nodes.$node_id.repositories: {} }
+
+---
+"Repository throttling stats (some repositories exist)":
+  - skip:
+      version: " - 8.8.99"
+      reason: "repository throttling stats was added in 8.9.0"
+      features: [arbitrary_key]
+
+  - do:
+      nodes.info: {}
+  - set:
+      nodes._arbitrary_key_: node_id
+
+  - do:
+      snapshot.create_repository:
+        repository: test_repo_uuid_1
+        body:
+          type: fs
+          settings:
+            location: "test_repo_uuid_1_loc"
+
+  - do:
+      snapshot.create_repository:
+        repository: test_repo_uuid_2
+        body:
+          type: fs
+          settings:
+            location: "test_repo_uuid_2_loc"
+
+  - do:
+      nodes.stats:
+        metric: [ repository ]
+
+  - is_true: nodes.$node_id.repositories
+
+  - is_true: nodes.$node_id.repositories.test_repo_uuid_1
+  - gte: { nodes.$node_id.repositories.test_repo_uuid_1.total_read_throttled_time_nanos: 0 }
+  - gte: { nodes.$node_id.repositories.test_repo_uuid_1.total_write_throttled_time_nanos: 0 }
+
+  - is_true: nodes.$node_id.repositories.test_repo_uuid_2
+  - gte: { nodes.$node_id.repositories.test_repo_uuid_2.total_read_throttled_time_nanos: 0 }
+  - gte: { nodes.$node_id.repositories.test_repo_uuid_2.total_write_throttled_time_nanos: 0 }
+

+ 78 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryThrottlingStatsIT.java

@@ -0,0 +1,78 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.snapshots;
+
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
+import org.elasticsearch.action.admin.indices.stats.IndexStats;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.repositories.RepositoriesStats;
+import org.elasticsearch.test.ESIntegTestCase;
+
+import java.util.Collections;
+
+import static org.hamcrest.Matchers.greaterThan;
+
+@ESIntegTestCase.ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST)
+public class RepositoryThrottlingStatsIT extends AbstractSnapshotIntegTestCase {
+
+    public void testRepositoryThrottlingStats() throws Exception {
+
+        logger.info("--> starting a node");
+        internalCluster().startNode();
+
+        logger.info("--> create index");
+        createIndexWithRandomDocs("test-idx", 100);
+
+        IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test-idx").get();
+        IndexStats indexStats = indicesStats.getIndex("test-idx");
+        long totalSizeInBytes = 0;
+        for (ShardStats shard : indexStats.getShards()) {
+            totalSizeInBytes += shard.getStats().getStore().getSizeInBytes();
+        }
+        logger.info("--> total shards size: {} bytes", totalSizeInBytes);
+
+        logger.info("--> create repository with really low snapshot/restore rate-limits");
+        createRepository(
+            "test-repo",
+            "fs",
+            Settings.builder()
+                .put("location", randomRepoPath())
+                .put("compress", false)
+                // set rate limits at ~25% of total size
+                .put("max_snapshot_bytes_per_sec", ByteSizeValue.ofBytes(totalSizeInBytes / 4))
+                .put("max_restore_bytes_per_sec", ByteSizeValue.ofBytes(totalSizeInBytes / 4))
+        );
+
+        logger.info("--> create snapshot");
+        createSnapshot("test-repo", "test-snap", Collections.singletonList("test-idx"));
+
+        logger.info("--> restore from snapshot");
+        RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap")
+            .setRenamePattern("test-")
+            .setRenameReplacement("test2-")
+            .setWaitForCompletion(true)
+            .execute()
+            .actionGet();
+        assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
+        assertDocCount("test-idx", 100);
+
+        logger.info("--> access repository throttling stats via _nodes/stats api");
+        NodesStatsResponse response = client().admin().cluster().prepareNodesStats().setRepositoryStats(true).get();
+        RepositoriesStats stats = response.getNodes().get(0).getRepositoriesStats();
+
+        assertTrue(stats.getRepositoryThrottlingStats().containsKey("test-repo"));
+        assertTrue(stats.getRepositoryThrottlingStats().get("test-repo").totalWriteThrottledNanos() > 0);
+        assertTrue(stats.getRepositoryThrottlingStats().get("test-repo").totalReadThrottledNanos() > 0);
+
+    }
+}

+ 2 - 1
server/src/main/java/org/elasticsearch/TransportVersion.java

@@ -134,12 +134,13 @@ public record TransportVersion(int id) implements Comparable<TransportVersion> {
     public static final TransportVersion V_8_500_008 = registerTransportVersion(8_500_008, "8884ab9d-94cd-4bac-aff8-01f2c394f47c");
     public static final TransportVersion V_8_500_009 = registerTransportVersion(8_500_009, "35091358-fd41-4106-a6e2-d2a1315494c1");
     public static final TransportVersion V_8_500_010 = registerTransportVersion(8_500_010, "9818C628-1EEC-439B-B943-468F61460675");
+    public static final TransportVersion V_8_500_011 = registerTransportVersion(8_500_011, "2209F28D-B52E-4BC4-9889-E780F291C32E");
 
     /**
      * Reference to the most recent transport version.
      * This should be the transport version with the highest id.
      */
-    public static final TransportVersion CURRENT = findCurrent(V_8_500_010);
+    public static final TransportVersion CURRENT = findCurrent(V_8_500_011);
 
     /**
      * Reference to the earliest compatible transport version to this version of the codebase.

+ 24 - 8
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.action.admin.cluster.node.stats;
 
+import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.support.nodes.BaseNodeResponse;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -27,6 +28,7 @@ import org.elasticsearch.monitor.jvm.JvmStats;
 import org.elasticsearch.monitor.os.OsStats;
 import org.elasticsearch.monitor.process.ProcessStats;
 import org.elasticsearch.node.AdaptiveSelectionStats;
+import org.elasticsearch.repositories.RepositoriesStats;
 import org.elasticsearch.script.ScriptCacheStats;
 import org.elasticsearch.script.ScriptStats;
 import org.elasticsearch.threadpool.ThreadPoolStats;
@@ -92,6 +94,9 @@ public class NodeStats extends BaseNodeResponse implements ChunkedToXContent {
     @Nullable
     private final IndexingPressureStats indexingPressureStats;
 
+    @Nullable
+    private final RepositoriesStats repositoriesStats;
+
     public NodeStats(StreamInput in) throws IOException {
         super(in);
         timestamp = in.readVLong();
@@ -112,6 +117,11 @@ public class NodeStats extends BaseNodeResponse implements ChunkedToXContent {
         ingestStats = in.readOptionalWriteable(IngestStats::read);
         adaptiveSelectionStats = in.readOptionalWriteable(AdaptiveSelectionStats::new);
         indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new);
+        if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_011)) {
+            repositoriesStats = in.readOptionalWriteable(RepositoriesStats::new);
+        } else {
+            repositoriesStats = null;
+        }
     }
 
     public NodeStats(
@@ -131,7 +141,8 @@ public class NodeStats extends BaseNodeResponse implements ChunkedToXContent {
         @Nullable IngestStats ingestStats,
         @Nullable AdaptiveSelectionStats adaptiveSelectionStats,
         @Nullable ScriptCacheStats scriptCacheStats,
-        @Nullable IndexingPressureStats indexingPressureStats
+        @Nullable IndexingPressureStats indexingPressureStats,
+        @Nullable RepositoriesStats repositoriesStats
     ) {
         super(node);
         this.timestamp = timestamp;
@@ -150,6 +161,7 @@ public class NodeStats extends BaseNodeResponse implements ChunkedToXContent {
         this.adaptiveSelectionStats = adaptiveSelectionStats;
         this.scriptCacheStats = scriptCacheStats;
         this.indexingPressureStats = indexingPressureStats;
+        this.repositoriesStats = repositoriesStats;
     }
 
     public long getTimestamp() {
@@ -254,6 +266,11 @@ public class NodeStats extends BaseNodeResponse implements ChunkedToXContent {
         return indexingPressureStats;
     }
 
+    @Nullable
+    public RepositoriesStats getRepositoriesStats() {
+        return repositoriesStats;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
@@ -277,6 +294,9 @@ public class NodeStats extends BaseNodeResponse implements ChunkedToXContent {
         out.writeOptionalWriteable(ingestStats);
         out.writeOptionalWriteable(adaptiveSelectionStats);
         out.writeOptionalWriteable(indexingPressureStats);
+        if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_011)) {
+            out.writeOptionalWriteable(repositoriesStats);
+        }
     }
 
     @Override
@@ -309,12 +329,7 @@ public class NodeStats extends BaseNodeResponse implements ChunkedToXContent {
 
             ifPresent(getIndices()).toXContentChunked(outerParams),
 
-            Iterators.single((builder, params) -> {
-                ifPresent(getOs()).toXContent(builder, params);
-                ifPresent(getProcess()).toXContent(builder, params);
-                ifPresent(getJvm()).toXContent(builder, params);
-                return builder;
-            }),
+            singleChunk(ifPresent(getOs()), ifPresent(getProcess()), ifPresent(getJvm())),
 
             ifPresent(getThreadPool()).toXContentChunked(outerParams),
             singleChunk(ifPresent(getFs())),
@@ -326,7 +341,8 @@ public class NodeStats extends BaseNodeResponse implements ChunkedToXContent {
             ifPresent(getIngestStats()).toXContentChunked(outerParams),
             singleChunk(ifPresent(getAdaptiveSelectionStats())),
             ifPresent(getScriptCacheStats()).toXContentChunked(outerParams),
-            singleChunk(ifPresent(getIndexingPressureStats()))
+            singleChunk(ifPresent(getIndexingPressureStats())),
+            singleChunk(ifPresent(getRepositoriesStats()))
         );
     }
 

+ 2 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java

@@ -178,7 +178,8 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         INGEST("ingest"),
         ADAPTIVE_SELECTION("adaptive_selection"),
         SCRIPT_CACHE("script_cache"),
-        INDEXING_PRESSURE("indexing_pressure");
+        INDEXING_PRESSURE("indexing_pressure"),
+        REPOSITORY("repository");
 
         private String metricName;
 

+ 5 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java

@@ -153,6 +153,11 @@ public class NodesStatsRequestBuilder extends NodesOperationRequestBuilder<
         return this;
     }
 
+    public NodesStatsRequestBuilder setRepositoryStats(boolean repositoryStats) {
+        addOrRemoveMetric(repositoryStats, NodesStatsRequest.Metric.REPOSITORY);
+        return this;
+    }
+
     /**
      * Helper method for adding metrics to a request
      */

+ 2 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

@@ -95,7 +95,8 @@ public class TransportNodesStatsAction extends TransportNodesAction<
             NodesStatsRequest.Metric.INGEST.containedIn(metrics),
             NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
             NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
-            NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics)
+            NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
+            NodesStatsRequest.Metric.REPOSITORY.containedIn(metrics)
         );
     }
 

+ 1 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

@@ -196,6 +196,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<
             true,
             false,
             false,
+            false,
             false
         );
         List<ShardStats> shardsStats = new ArrayList<>();

+ 1 - 0
server/src/main/java/org/elasticsearch/health/node/LocalHealthMonitor.java

@@ -466,6 +466,7 @@ public class LocalHealthMonitor implements ClusterStateListener {
                 false,
                 false,
                 false,
+                false,
                 false
             );
             return DiskUsage.findLeastAvailablePath(nodeStats);

+ 2 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -953,7 +953,8 @@ public class Node implements Closeable {
                 responseCollectorService,
                 searchTransportService,
                 indexingLimits,
-                searchModule.getValuesSourceRegistry().getUsageService()
+                searchModule.getValuesSourceRegistry().getUsageService(),
+                repositoryService
             );
 
             final SearchService searchService = newSearchService(

+ 9 - 4
server/src/main/java/org/elasticsearch/node/NodeService.java

@@ -28,6 +28,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.monitor.MonitorService;
 import org.elasticsearch.plugins.PluginsService;
+import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.aggregations.support.AggregationUsageService;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -53,8 +54,8 @@ public class NodeService implements Closeable {
     private final SearchTransportService searchTransportService;
     private final IndexingPressure indexingPressure;
     private final AggregationUsageService aggregationUsageService;
-
     private final Coordinator coordinator;
+    private final RepositoriesService repositoriesService;
 
     NodeService(
         Settings settings,
@@ -73,7 +74,8 @@ public class NodeService implements Closeable {
         ResponseCollectorService responseCollectorService,
         SearchTransportService searchTransportService,
         IndexingPressure indexingPressure,
-        AggregationUsageService aggregationUsageService
+        AggregationUsageService aggregationUsageService,
+        RepositoriesService repositoriesService
     ) {
         this.settings = settings;
         this.threadPool = threadPool;
@@ -91,6 +93,7 @@ public class NodeService implements Closeable {
         this.searchTransportService = searchTransportService;
         this.indexingPressure = indexingPressure;
         this.aggregationUsageService = aggregationUsageService;
+        this.repositoriesService = repositoriesService;
         clusterService.addStateApplier(ingestService);
     }
 
@@ -143,7 +146,8 @@ public class NodeService implements Closeable {
         boolean ingest,
         boolean adaptiveSelection,
         boolean scriptCache,
-        boolean indexingPressure
+        boolean indexingPressure,
+        boolean repositoriesStats
     ) {
         // for indices stats we want to include previous allocated shards stats as well (it will
         // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
@@ -164,7 +168,8 @@ public class NodeService implements Closeable {
             ingest ? ingestService.stats() : null,
             adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
             scriptCache ? scriptService.cacheStats() : null,
-            indexingPressure ? this.indexingPressure.stats() : null
+            indexingPressure ? this.indexingPressure.stats() : null,
+            repositoriesStats ? this.repositoriesService.getRepositoriesThrottlingStats() : null
         );
     }
 

+ 14 - 0
server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

@@ -59,6 +59,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.Collections.unmodifiableMap;
@@ -677,6 +678,19 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
         return repositoriesStats;
     }
 
+    public RepositoriesStats getRepositoriesThrottlingStats() {
+        return new RepositoriesStats(
+            repositories.values()
+                .stream()
+                .collect(
+                    Collectors.toMap(
+                        r -> r.getMetadata().name(),
+                        r -> new RepositoriesStats.ThrottlingStats(r.getRestoreThrottleTimeInNanos(), r.getSnapshotThrottleTimeInNanos())
+                    )
+                )
+        );
+    }
+
     private List<RepositoryStatsSnapshot> getRepositoryStatsForActiveRepositories() {
         return Stream.concat(repositories.values().stream(), internalRepositories.values().stream())
             .filter(r -> r instanceof MeteredBlobStoreRepository)

+ 84 - 0
server/src/main/java/org/elasticsearch/repositories/RepositoriesStats.java

@@ -0,0 +1,84 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.repositories;
+
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xcontent.ToXContentFragment;
+import org.elasticsearch.xcontent.ToXContentObject;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class RepositoriesStats implements Writeable, ToXContentFragment {
+
+    private final Map<String, ThrottlingStats> repositoryThrottlingStats;
+
+    public RepositoriesStats(StreamInput in) throws IOException {
+        if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_011)) {
+            repositoryThrottlingStats = in.readMap(StreamInput::readString, ThrottlingStats::new);
+        } else {
+            repositoryThrottlingStats = new HashMap<>();
+        }
+    }
+
+    public RepositoriesStats(Map<String, ThrottlingStats> repositoryThrottlingStats) {
+        this.repositoryThrottlingStats = new HashMap<>(repositoryThrottlingStats);
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_011)) {
+            out.writeMap(repositoryThrottlingStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
+        }
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.field("repositories", repositoryThrottlingStats);
+        return builder;
+    }
+
+    public Map<String, ThrottlingStats> getRepositoryThrottlingStats() {
+        return Collections.unmodifiableMap(repositoryThrottlingStats);
+    }
+
+    public record ThrottlingStats(long totalReadThrottledNanos, long totalWriteThrottledNanos) implements ToXContentObject, Writeable {
+
+        ThrottlingStats(StreamInput in) throws IOException {
+            this(in.readVLong(), in.readVLong());
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            if (builder.humanReadable()) {
+                builder.field("total_read_throttled_time", new TimeValue(totalReadThrottledNanos, TimeUnit.NANOSECONDS));
+                builder.field("total_write_throttled_time", new TimeValue(totalWriteThrottledNanos, TimeUnit.NANOSECONDS));
+            }
+            builder.field("total_read_throttled_time_nanos", totalReadThrottledNanos);
+            builder.field("total_write_throttled_time_nanos", totalWriteThrottledNanos);
+            builder.endObject();
+            return builder;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeVLong(totalReadThrottledNanos);
+            out.writeVLong(totalWriteThrottledNanos);
+        }
+    }
+}

+ 15 - 3
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -64,6 +64,7 @@ import org.elasticsearch.monitor.os.OsStats;
 import org.elasticsearch.monitor.process.ProcessStats;
 import org.elasticsearch.node.AdaptiveSelectionStats;
 import org.elasticsearch.node.ResponseCollectorService;
+import org.elasticsearch.repositories.RepositoriesStats;
 import org.elasticsearch.script.ScriptCacheStats;
 import org.elasticsearch.script.ScriptContextStats;
 import org.elasticsearch.script.ScriptStats;
@@ -457,6 +458,12 @@ public class NodeStatsTests extends ESTestCase {
                     assertEquals(scriptCacheStats, deserializedScriptCacheStats);
                     assertNotSame(scriptCacheStats, deserializedScriptCacheStats);
                 }
+
+                RepositoriesStats repoThrottlingStats = deserializedNodeStats.getRepositoriesStats();
+                assertTrue(repoThrottlingStats.getRepositoryThrottlingStats().containsKey("test-repository"));
+                assertEquals(100, repoThrottlingStats.getRepositoryThrottlingStats().get("test-repository").totalReadThrottledNanos());
+                assertEquals(200, repoThrottlingStats.getRepositoryThrottlingStats().get("test-repository").totalWriteThrottledNanos());
+
             }
         }
     }
@@ -480,8 +487,8 @@ public class NodeStatsTests extends ESTestCase {
     }
 
     private static int expectedChunks(NodeStats nodeStats, NodeStatsLevel level) {
-        return 7 // one per each chunkeable object
-            + expectedChunks(nodeStats.getHttp()) //
+        // expectedChunks = number of static chunks (8 at the moment, see NodeStats#toXContentChunked) + number of variable chunks
+        return 8 + expectedChunks(nodeStats.getHttp()) //
             + expectedChunks(nodeStats.getIndices(), level) //
             + expectedChunks(nodeStats.getTransport()) //
             + expectedChunks(nodeStats.getIngestStats()) //
@@ -1022,6 +1029,10 @@ public class NodeStatsTests extends ESTestCase {
                 randomLongBetween(0, maxStatValue)
             );
         }
+        RepositoriesStats repositoriesStats = new RepositoriesStats(
+            Map.of("test-repository", new RepositoriesStats.ThrottlingStats(100, 200))
+        );
+
         return new NodeStats(
             node,
             randomNonNegativeLong(),
@@ -1039,7 +1050,8 @@ public class NodeStatsTests extends ESTestCase {
             ingestStats,
             adaptiveSelectionStats,
             scriptCacheStats,
-            indexingPressureStats
+            indexingPressureStats,
+            repositoriesStats
         );
     }
 

+ 6 - 0
server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java

@@ -190,6 +190,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             );
             DiskUsage leastNode = DiskUsage.findLeastAvailablePath(nodeStats);
@@ -217,6 +218,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             );
             DiskUsage leastNode = DiskUsage.findLeastAvailablePath(nodeStats);
@@ -246,6 +248,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             );
             DiskUsage leastNode = DiskUsage.findLeastAvailablePath(nodeStats);
@@ -279,6 +282,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             );
             DiskUsage leastNode = DiskUsage.findLeastAvailablePath(nodeStats);
@@ -307,6 +311,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             );
             DiskUsage leastNode = DiskUsage.findLeastAvailablePath(nodeStats);
@@ -336,6 +341,7 @@ public class DiskUsageTests extends ESTestCase {
                 null,
                 null,
                 null,
+                null,
                 null
             );
 

+ 5 - 0
server/src/test/java/org/elasticsearch/health/node/LocalHealthMonitorTests.java

@@ -249,6 +249,7 @@ public class LocalHealthMonitorTests extends ESTestCase {
                 eq(false),
                 eq(false),
                 eq(false),
+                eq(false),
                 eq(false)
             )
         ).thenReturn(nodeStats());
@@ -350,6 +351,7 @@ public class LocalHealthMonitorTests extends ESTestCase {
                 eq(false),
                 eq(false),
                 eq(false),
+                eq(false),
                 eq(false)
             )
         ).thenReturn(nodeStats(1000, 10));
@@ -372,6 +374,7 @@ public class LocalHealthMonitorTests extends ESTestCase {
                 eq(false),
                 eq(false),
                 eq(false),
+                eq(false),
                 eq(false)
             )
         ).thenReturn(nodeStats(1000, 80));
@@ -394,6 +397,7 @@ public class LocalHealthMonitorTests extends ESTestCase {
                 eq(false),
                 eq(false),
                 eq(false),
+                eq(false),
                 eq(false)
             )
         ).thenReturn(nodeStats(1000, 110));
@@ -426,6 +430,7 @@ public class LocalHealthMonitorTests extends ESTestCase {
             null,
             null,
             null,
+            null,
             null
         );
     }

+ 9 - 0
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

@@ -253,6 +253,15 @@ public class RepositoriesServiceTests extends ESTestCase {
         );
     }
 
+    public void testRepositoriesThrottlingStats() {
+        var repoName = randomAlphaOfLengthBetween(10, 25);
+        var clusterState = createClusterStateWithRepo(repoName, TestRepository.TYPE);
+        repositoriesService.applyClusterState(new ClusterChangedEvent("put test repository", clusterState, emptyState()));
+        RepositoriesStats throttlingStats = repositoriesService.getRepositoriesThrottlingStats();
+        assertTrue(throttlingStats.getRepositoryThrottlingStats().containsKey(repoName));
+        assertNotNull(throttlingStats.getRepositoryThrottlingStats().get(repoName));
+    }
+
     // InvalidRepository is created when current node is non-master node and failed to create repository by applying cluster state from
     // master. When current node become master node later and same repository is put again, current node can create repository successfully
     // and replace previous InvalidRepository

+ 1 - 0
server/src/test/java/org/elasticsearch/rest/action/info/RestClusterInfoActionTests.java

@@ -112,6 +112,7 @@ public class RestClusterInfoActionTests extends ESTestCase {
             null,
             null,
             null,
+            null,
             null
         );
     }

+ 2 - 1
test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java

@@ -88,7 +88,8 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
                 nodeStats.getIngestStats(),
                 nodeStats.getAdaptiveSelectionStats(),
                 nodeStats.getScriptCacheStats(),
-                nodeStats.getIndexingPressureStats()
+                nodeStats.getIndexingPressureStats(),
+                nodeStats.getRepositoriesStats()
             );
         }).collect(Collectors.toList());
     }

+ 1 - 0
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -2449,6 +2449,7 @@ public final class InternalTestCluster extends TestCluster {
                     false,
                     false,
                     false,
+                    false,
                     false
                 );
                 assertThat(

+ 1 - 0
x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java

@@ -442,6 +442,7 @@ public class AutoscalingNodesInfoServiceTests extends AutoscalingTestCase {
             null,
             null,
             null,
+            null,
             null
         );
     }

+ 1 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java

@@ -271,6 +271,7 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
             ingestStats,
             null,
             null,
+            null,
             null
         );
 

+ 1 - 0
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java

@@ -463,6 +463,7 @@ public class NodeStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCa
             null,
             null,
             null,
+            null,
             null
         );
     }