Browse Source

Tie snapshot speed to node bandwidth settings (#91021)

If the recovery node bandwidth settings exist,
then the default value for max snapshot speed will
be infinite, and the speed will be rate limited
by the recovery rate limit as well.

Fixes #57023
Iraklis Psaroudakis 2 years ago
parent
commit
7bfc672eb4

+ 6 - 0
docs/changelog/91021.yaml

@@ -0,0 +1,6 @@
+pr: 91021
+summary: Tie snapshot speed to node bandwidth settings
+area: Snapshot/Restore
+type: enhancement
+issues:
+ - 57023

+ 3 - 0
docs/plugins/repository-shared-settings.asciidoc

@@ -6,6 +6,9 @@
 `max_snapshot_bytes_per_sec`::
 
     Throttles per node snapshot rate. Defaults to `40mb` per second.
+    Note that if the {ref}/recovery.html[recovery settings for managed services]
+    are set, then it defaults to unlimited, and the rate is additionally
+    throttled through {ref}/recovery.html[recovery settings].
 
 `readonly`::
 

+ 8 - 4
docs/reference/snapshot-restore/apis/repo-analysis-api.asciidoc

@@ -507,13 +507,17 @@ blob was not overwritten.
 
 `write_throttled`::
 (string)
-The length of time spent waiting for the `max_snapshot_bytes_per_sec` throttle
-while writing this blob.
+The length of time spent waiting for the `max_snapshot_bytes_per_sec` (or
+`indices.recovery.max_bytes_per_sec` if the
+<<recovery-settings-for-managed-services,recovery settings for managed services>>
+are set) throttle while writing this blob.
 
 `write_throttled_nanos`::
 (long)
-The length of time spent waiting for the `max_snapshot_bytes_per_sec` throttle
-while writing this blob, in nanoseconds.
+The length of time spent waiting for the `max_snapshot_bytes_per_sec` (or
+`indices.recovery.max_bytes_per_sec` if the
+<<recovery-settings-for-managed-services,recovery settings for managed services>>
+are set) throttle while writing this blob, in nanoseconds.
 
 `reads`::
 (array)

+ 3 - 0
docs/reference/snapshot-restore/repository-shared-settings.asciidoc

@@ -6,6 +6,9 @@ that restores are also throttled through <<recovery,recovery settings>>.
 `max_snapshot_bytes_per_sec`::
 (Optional, <<byte-units,byte value>>)
 Maximum snapshot creation rate per node. Defaults to `40mb` per second.
+Note that if the <<recovery-settings-for-managed-services,recovery settings for managed services>>
+are set, then it defaults to unlimited, and the rate is additionally
+throttled through <<recovery,recovery settings>>.
 
 //tag::readonly-repo-setting[]
 `readonly`::

+ 0 - 69
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

@@ -55,7 +55,6 @@ import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.repositories.IndexId;
-import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@@ -83,7 +82,6 @@ import java.util.stream.IntStream;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
 import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
 import static org.elasticsearch.index.shard.IndexShardTests.getEngineFromShard;
-import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
 import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
@@ -1078,73 +1076,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         );
     }
 
-    public void testThrottling() throws Exception {
-        Client client = client();
-
-        boolean throttleSnapshot = randomBoolean();
-        boolean throttleRestore = randomBoolean();
-        boolean throttleRestoreViaRecoverySettings = throttleRestore && randomBoolean();
-        createRepository(
-            "test-repo",
-            "fs",
-            Settings.builder()
-                .put("location", randomRepoPath())
-                .put("compress", randomBoolean())
-                .put("chunk_size", randomIntBetween(1000, 10000), ByteSizeUnit.BYTES)
-                .put("max_restore_bytes_per_sec", throttleRestore && (throttleRestoreViaRecoverySettings == false) ? "10k" : "0")
-                .put("max_snapshot_bytes_per_sec", throttleSnapshot ? "10k" : "0")
-        );
-
-        createIndexWithRandomDocs("test-idx", 100);
-        createSnapshot("test-repo", "test-snap", Collections.singletonList("test-idx"));
-
-        logger.info("--> delete index");
-        cluster().wipeIndices("test-idx");
-
-        logger.info("--> restore index");
-        client.admin()
-            .cluster()
-            .prepareUpdateSettings()
-            .setPersistentSettings(
-                Settings.builder()
-                    .put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), throttleRestoreViaRecoverySettings ? "10k" : "0")
-                    .build()
-            )
-            .get();
-        RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
-            .cluster()
-            .prepareRestoreSnapshot("test-repo", "test-snap")
-            .setWaitForCompletion(true)
-            .execute()
-            .actionGet();
-        assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
-        assertDocCount("test-idx", 100L);
-
-        long snapshotPause = 0L;
-        long restorePause = 0L;
-        for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
-            snapshotPause += repositoriesService.repository("test-repo").getSnapshotThrottleTimeInNanos();
-            restorePause += repositoriesService.repository("test-repo").getRestoreThrottleTimeInNanos();
-        }
-
-        if (throttleSnapshot) {
-            assertThat(snapshotPause, greaterThan(0L));
-        } else {
-            assertThat(snapshotPause, equalTo(0L));
-        }
-
-        if (throttleRestore) {
-            assertThat(restorePause, greaterThan(0L));
-        } else {
-            assertThat(restorePause, equalTo(0L));
-        }
-        client.admin()
-            .cluster()
-            .prepareUpdateSettings()
-            .setPersistentSettings(Settings.builder().putNull(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()).build())
-            .get();
-    }
-
     public void testSnapshotStatus() throws Exception {
         Client client = client();
         createRepository(

+ 186 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotThrottlingIT.java

@@ -0,0 +1,186 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.snapshots;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.MockLogAppender;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+
+import java.util.Collections;
+
+import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
+import static org.elasticsearch.indices.recovery.RecoverySettings.NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING;
+import static org.elasticsearch.indices.recovery.RecoverySettings.NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING;
+import static org.elasticsearch.indices.recovery.RecoverySettings.NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING;
+import static org.hamcrest.Matchers.greaterThan;
+
+@ESIntegTestCase.ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST)
+public class SnapshotThrottlingIT extends AbstractSnapshotIntegTestCase {
+
+    private Tuple<Long, Long> testThrottledRepository(String maxSnapshotBytesPerSec, String maxRestoreBytesPerSec, boolean compressRepo) {
+        logger.info(
+            "--> testing throttled repository (maxSnapshotBytesPerSec=[{}], maxRestoreBytesPerSec=[{}], compressRepo=[{}])",
+            maxSnapshotBytesPerSec,
+            maxRestoreBytesPerSec,
+            compressRepo
+        );
+        createRepository(
+            "test-repo",
+            "fs",
+            Settings.builder()
+                .put("location", randomRepoPath())
+                .put("compress", compressRepo)
+                .put("chunk_size", randomIntBetween(1000, 4000), ByteSizeUnit.BYTES)
+                .put("max_snapshot_bytes_per_sec", maxSnapshotBytesPerSec)
+                .put("max_restore_bytes_per_sec", maxRestoreBytesPerSec)
+        );
+        createSnapshot("test-repo", "test-snap", Collections.singletonList("test-idx"));
+        RestoreSnapshotResponse restoreSnapshotResponse = client().admin()
+            .cluster()
+            .prepareRestoreSnapshot("test-repo", "test-snap")
+            .setRenamePattern("test-")
+            .setRenameReplacement("test2-")
+            .setWaitForCompletion(true)
+            .execute()
+            .actionGet();
+        assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
+        assertDocCount("test-idx", 50L);
+        long snapshotPause = 0L;
+        long restorePause = 0L;
+        for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
+            snapshotPause += repositoriesService.repository("test-repo").getSnapshotThrottleTimeInNanos();
+            restorePause += repositoriesService.repository("test-repo").getRestoreThrottleTimeInNanos();
+        }
+        cluster().wipeIndices("test2-idx");
+        logger.warn("--> tested throttled repository with snapshot pause [{}] and restore pause [{}]", snapshotPause, restorePause);
+        return new Tuple<>(snapshotPause, restorePause);
+    }
+
+    public void testThrottling() throws Exception {
+        boolean compressRepo = randomBoolean();
+        boolean throttleSnapshotViaRecovery = randomBoolean();
+        boolean throttleRestoreViaRecovery = throttleSnapshotViaRecovery || randomBoolean();
+
+        Settings.Builder primaryNodeSettings = Settings.builder()
+            .put(
+                INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(),
+                (throttleSnapshotViaRecovery || throttleRestoreViaRecovery) ? "25k" : "0"
+            );
+
+        if (throttleSnapshotViaRecovery) {
+            primaryNodeSettings = primaryNodeSettings.put(NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING.getKey(), "25k")
+                .put(NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING.getKey(), "25k")
+                .put(NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING.getKey(), "25k");
+        }
+        final String primaryNode = internalCluster().startNode(primaryNodeSettings);
+
+        logger.info("--> create index");
+        createIndexWithRandomDocs("test-idx", 50);
+
+        long snapshotPauseViaRecovery = 0L;
+        long restorePauseViaRecovery = 0L;
+
+        // Throttle snapshot and/or restore only via recovery 25kb rate limit
+        if (throttleSnapshotViaRecovery || throttleRestoreViaRecovery) {
+            logger.info("--> testing throttling via recovery settings only");
+            Tuple<Long, Long> pauses = testThrottledRepository("0", "0", compressRepo);
+            snapshotPauseViaRecovery += pauses.v1();
+            restorePauseViaRecovery += pauses.v2();
+            if (throttleSnapshotViaRecovery) assertThat(snapshotPauseViaRecovery, greaterThan(0L));
+            if (throttleRestoreViaRecovery) assertThat(restorePauseViaRecovery, greaterThan(0L));
+        }
+
+        // Throttle snapshot and/or restore separately with 5kb rate limit, which is much less than half of the potential recovery rate
+        // limit. For this reason, we assert that the separately throttled speeds incur a pause time which is at least double of the
+        // pause time detected in the recovery-only throttling run above.
+        boolean throttleSnapshot = randomBoolean();
+        boolean throttleRestore = randomBoolean();
+
+        if (throttleSnapshot || throttleRestore) {
+            Tuple<Long, Long> pauses = testThrottledRepository(throttleSnapshot ? "5k" : "0", throttleRestore ? "5k" : "0", compressRepo);
+            long snapshotPause = pauses.v1();
+            long restorePause = pauses.v2();
+            if (throttleSnapshot) {
+                assertThat(snapshotPause, greaterThan(0L));
+                if (throttleSnapshotViaRecovery) assertThat(snapshotPause, greaterThan(snapshotPauseViaRecovery * 2));
+            }
+            if (throttleRestore) {
+                assertThat(restorePause, greaterThan(0L));
+                if (throttleRestoreViaRecovery) assertThat(restorePause, greaterThan(restorePauseViaRecovery * 2));
+            }
+        }
+    }
+
+    @TestLogging(
+        reason = "testing warning that speed is over recovery speed",
+        value = "org.elasticsearch.repositories.blobstore.BlobStoreRepository:WARN"
+    )
+    public void testWarningSpeedOverRecovery() throws Exception {
+        boolean nodeBandwidthSettingsSet = randomBoolean();
+        Settings.Builder primaryNodeSettings = Settings.builder().put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "100m");
+        if (nodeBandwidthSettingsSet) {
+            primaryNodeSettings = primaryNodeSettings.put(NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING.getKey(), "100m")
+                .put(NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING.getKey(), "100m")
+                .put(NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING.getKey(), "100m");
+        }
+        final String primaryNode = internalCluster().startNode(primaryNodeSettings);
+
+        final MockLogAppender mockLogAppender = new MockLogAppender();
+        try {
+            mockLogAppender.start();
+            Loggers.addAppender(LogManager.getLogger(BlobStoreRepository.class), mockLogAppender);
+
+            MockLogAppender.EventuallySeenEventExpectation snapshotExpectation = new MockLogAppender.EventuallySeenEventExpectation(
+                "snapshot speed over recovery speed",
+                "org.elasticsearch.repositories.blobstore.BlobStoreRepository",
+                Level.WARN,
+                "repository [test-repo] has a rate limit [max_snapshot_bytes_per_sec=1gb] per second which is above "
+                    + "the effective recovery rate limit [indices.recovery.max_bytes_per_sec=100mb] per second, thus the repository "
+                    + "rate limit will be superseded by the recovery rate limit"
+            );
+            if (nodeBandwidthSettingsSet) snapshotExpectation.setExpectSeen();
+            mockLogAppender.addExpectation(snapshotExpectation);
+
+            MockLogAppender.SeenEventExpectation restoreExpectation = new MockLogAppender.SeenEventExpectation(
+                "snapshot restore speed over recovery speed",
+                "org.elasticsearch.repositories.blobstore.BlobStoreRepository",
+                Level.WARN,
+                "repository [test-repo] has a rate limit [max_restore_bytes_per_sec=2gb] per second which is above "
+                    + "the effective recovery rate limit [indices.recovery.max_bytes_per_sec=100mb] per second, thus the repository "
+                    + "rate limit will be superseded by the recovery rate limit"
+            );
+            mockLogAppender.addExpectation(restoreExpectation);
+
+            createRepository(
+                "test-repo",
+                "fs",
+                Settings.builder()
+                    .put("location", randomRepoPath())
+                    .put("max_snapshot_bytes_per_sec", "1g")
+                    .put("max_restore_bytes_per_sec", "2g")
+            );
+
+            deleteRepository("test-repo");
+            mockLogAppender.assertAllExpectationsMatched();
+        } finally {
+            Loggers.removeAppender(LogManager.getLogger(BlobStoreRepository.class), mockLogAppender);
+            mockLogAppender.stop();
+        }
+    }
+
+}

+ 17 - 1
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java

@@ -388,6 +388,7 @@ public class RecoverySettings {
     private volatile TimeValue internalActionRetryTimeout;
     private volatile TimeValue internalActionLongTimeout;
     private volatile boolean useSnapshotsDuringRecovery;
+    private final boolean nodeBandwidthSettingsExist;
     private volatile int maxConcurrentSnapshotFileDownloads;
     private volatile int maxConcurrentSnapshotFileDownloadsPerNode;
 
@@ -419,6 +420,7 @@ public class RecoverySettings {
         this.availableDiskReadBandwidth = NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING.get(settings);
         this.availableDiskWriteBandwidth = NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING.get(settings);
         validateNodeBandwidthRecoverySettings(settings);
+        this.nodeBandwidthSettingsExist = hasNodeBandwidthRecoverySettings(settings);
         computeMaxBytesPerSec(settings);
         if (DiscoveryNode.canContainData(settings)) {
             clusterSettings.addSettingsUpdateConsumer(
@@ -490,6 +492,7 @@ public class RecoverySettings {
         }
 
         final long availableBytesPerSec = Math.min(readBytesPerSec, writeBytesPerSec);
+        assert nodeBandwidthSettingsExist == (availableBytesPerSec != 0L);
 
         long maxBytesPerSec;
         if (availableBytesPerSec == 0L                                      // no node recovery bandwidths
@@ -607,7 +610,7 @@ public class RecoverySettings {
         }
     }
 
-    ByteSizeValue getMaxBytesPerSec() {
+    public ByteSizeValue getMaxBytesPerSec() {
         return maxBytesPerSec;
     }
 
@@ -627,6 +630,10 @@ public class RecoverySettings {
         this.maxConcurrentOperations = maxConcurrentOperations;
     }
 
+    public boolean nodeBandwidthSettingsExist() {
+        return nodeBandwidthSettingsExist;
+    }
+
     public boolean getUseSnapshotsDuringRecovery() {
         return useSnapshotsDuringRecovery;
     }
@@ -688,4 +695,13 @@ public class RecoverySettings {
             );
         }
     }
+
+    /**
+     * Whether the node bandwidth recovery settings are set.
+     */
+    public static boolean hasNodeBandwidthRecoverySettings(Settings settings) {
+        return NODE_BANDWIDTH_RECOVERY_SETTINGS.stream()
+            .filter(setting -> setting.get(settings) != ByteSizeValue.MINUS_ONE)
+            .count() == NODE_BANDWIDTH_RECOVERY_SETTINGS.size();
+    }
 }

+ 70 - 17
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -62,7 +62,6 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
 import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -152,6 +151,7 @@ import java.util.stream.Stream;
 import static org.elasticsearch.core.Strings.format;
 import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
 import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
+import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
 
 /**
  * BlobStore - based implementation of Snapshot Repository
@@ -304,7 +304,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     public static final Setting<ByteSizeValue> MAX_SNAPSHOT_BYTES_PER_SEC = Setting.byteSizeSetting(
         "max_snapshot_bytes_per_sec",
-        new ByteSizeValue(40, ByteSizeUnit.MB),
+        (settings) -> {
+            if (RecoverySettings.hasNodeBandwidthRecoverySettings(settings)) {
+                return "0";
+            } else {
+                return "40mb";
+            }
+        },
         Setting.Property.Dynamic,
         Setting.Property.NodeScope
     );
@@ -399,8 +405,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         this.recoverySettings = recoverySettings;
         this.compress = COMPRESS_SETTING.get(metadata.settings());
         this.supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings());
-        snapshotRateLimiter = getRateLimiter(metadata.settings(), MAX_SNAPSHOT_BYTES_PER_SEC);
-        restoreRateLimiter = getRateLimiter(metadata.settings(), MAX_RESTORE_BYTES_PER_SEC);
+        snapshotRateLimiter = getSnapshotRateLimiter();
+        restoreRateLimiter = getRestoreRateLimiter();
         readOnly = metadata.settings().getAsBoolean(READONLY_SETTING_KEY, false);
         cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
         bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
@@ -648,8 +654,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         metadata = getRepoMetadata(state);
         final Settings updatedSettings = metadata.settings();
         if (updatedSettings.equals(previousSettings) == false) {
-            snapshotRateLimiter = getRateLimiter(metadata.settings(), MAX_SNAPSHOT_BYTES_PER_SEC);
-            restoreRateLimiter = getRateLimiter(metadata.settings(), MAX_RESTORE_BYTES_PER_SEC);
+            snapshotRateLimiter = getSnapshotRateLimiter();
+            restoreRateLimiter = getRestoreRateLimiter();
         }
 
         uncleanStart = uncleanStart && metadata.generation() != metadata.pendingGeneration();
@@ -1639,19 +1645,59 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     /**
      * Configures RateLimiter based on repository and global settings
      *
+     * @param rateLimiter        the existing rate limiter to configure (or null if no throttling was previously needed)
      * @param repositorySettings repository settings
      * @param setting            setting to use to configure rate limiter
-     * @return rate limiter or null of no throttling is needed
+     * @param warnIfOverRecovery log a warning if rate limit setting is over the effective recovery rate limit
+     * @return the newly configured rate limiter or null if no throttling is needed
      */
-    private static RateLimiter getRateLimiter(Settings repositorySettings, Setting<ByteSizeValue> setting) {
-        ByteSizeValue maxSnapshotBytesPerSec = setting.get(repositorySettings);
-        if (maxSnapshotBytesPerSec.getBytes() <= 0) {
+    private RateLimiter getRateLimiter(
+        RateLimiter rateLimiter,
+        Settings repositorySettings,
+        Setting<ByteSizeValue> setting,
+        boolean warnIfOverRecovery
+    ) {
+        ByteSizeValue maxConfiguredBytesPerSec = setting.get(repositorySettings);
+        if (maxConfiguredBytesPerSec.getBytes() <= 0) {
             return null;
         } else {
-            return new RateLimiter.SimpleRateLimiter(maxSnapshotBytesPerSec.getMbFrac());
+            ByteSizeValue effectiveRecoverySpeed = recoverySettings.getMaxBytesPerSec();
+            if (warnIfOverRecovery && effectiveRecoverySpeed.getBytes() > 0) {
+                if (maxConfiguredBytesPerSec.getBytes() > effectiveRecoverySpeed.getBytes()) {
+                    logger.warn(
+                        "repository [{}] has a rate limit [{}={}] per second which is above the effective recovery rate limit "
+                            + "[{}={}] per second, thus the repository rate limit will be superseded by the recovery rate limit",
+                        metadata.name(),
+                        setting.getKey(),
+                        maxConfiguredBytesPerSec,
+                        INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(),
+                        effectiveRecoverySpeed
+                    );
+                }
+            }
+
+            if (rateLimiter != null) {
+                rateLimiter.setMBPerSec(maxConfiguredBytesPerSec.getMbFrac());
+                return rateLimiter;
+            } else {
+                return new RateLimiter.SimpleRateLimiter(maxConfiguredBytesPerSec.getMbFrac());
+            }
         }
     }
 
+    private RateLimiter getSnapshotRateLimiter() {
+        return getRateLimiter(
+            snapshotRateLimiter,
+            metadata.settings(),
+            MAX_SNAPSHOT_BYTES_PER_SEC,
+            recoverySettings.nodeBandwidthSettingsExist()
+        );
+    }
+
+    private RateLimiter getRestoreRateLimiter() {
+        return getRateLimiter(restoreRateLimiter, metadata.settings(), MAX_RESTORE_BYTES_PER_SEC, true);
+    }
+
     @Override
     public long getSnapshotThrottleTimeInNanos() {
         return snapshotRateLimitingTimeInNanos.count();
@@ -3155,20 +3201,27 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     }
 
     /**
-     * Wrap the snapshot rate limiter (controlled by the repository setting `max_snapshot_bytes_per_sec`) around the given stream. Any
-     * throttling is recorded in the value returned by {@link BlobStoreRepository#getSnapshotThrottleTimeInNanos()}.
+     * Wrap the snapshot rate limiter around the given stream. Any throttling is recorded in the value returned by
+     * {@link BlobStoreRepository#getSnapshotThrottleTimeInNanos()}. Note that speed is throttled by the repository setting
+     * `max_snapshot_bytes_per_sec` and, if recovery node bandwidth settings have been set, additionally by the
+     * `indices.recovery.max_bytes_per_sec` speed.
      */
     public InputStream maybeRateLimitSnapshots(InputStream stream) {
         return maybeRateLimitSnapshots(stream, snapshotRateLimitingTimeInNanos::inc);
     }
 
     /**
-     * Wrap the snapshot rate limiter (controlled by the repository setting `max_snapshot_bytes_per_sec`) around the given stream. Any
-     * throttling is reported to the given listener and not otherwise recorded in the value returned by {@link
-     * BlobStoreRepository#getSnapshotThrottleTimeInNanos()}.
+     * Wrap the snapshot rate limiter around the given stream. Any throttling is recorded in the value returned by
+     * {@link BlobStoreRepository#getSnapshotThrottleTimeInNanos()}. Note that speed is throttled by the repository setting
+     * `max_snapshot_bytes_per_sec` and, if recovery node bandwidth settings have been set, additionally by the
+     * `indices.recovery.max_bytes_per_sec` speed.
      */
     public InputStream maybeRateLimitSnapshots(InputStream stream, RateLimitingInputStream.Listener throttleListener) {
-        return maybeRateLimit(stream, () -> snapshotRateLimiter, throttleListener);
+        InputStream rateLimitStream = maybeRateLimit(stream, () -> snapshotRateLimiter, throttleListener);
+        if (recoverySettings.nodeBandwidthSettingsExist()) {
+            rateLimitStream = maybeRateLimit(rateLimitStream, recoverySettings::rateLimiter, throttleListener);
+        }
+        return rateLimitStream;
     }
 
     @Override

+ 75 - 53
server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java

@@ -154,14 +154,30 @@ public class RecoverySettingsTests extends ESTestCase {
         );
     }
 
+    public void testNodeBandwidthSettingsExist() {
+        final NodeRecoverySettings recoverySettings = nodeRecoverySettings();
+        recoverySettings.withRandomIndicesRecoveryMaxBytesPerSec();
+        recoverySettings.withRoles(randomDataNodeRoles());
+        recoverySettings.withRandomMemory();
+
+        if (randomBoolean()) {
+            recoverySettings.withNetworkBandwidth(randomNonZeroByteSizeValue());
+            recoverySettings.withDiskReadBandwidth(randomNonZeroByteSizeValue());
+            recoverySettings.withDiskWriteBandwidth(randomNonZeroByteSizeValue());
+            assertTrue(recoverySettings.build().nodeBandwidthSettingsExist());
+        } else {
+            assertFalse(recoverySettings.build().nodeBandwidthSettingsExist());
+        }
+    }
+
     public void testDefaultMaxBytesPerSecOnNonDataNode() {
+        RecoverySettings recoverySettings = nodeRecoverySettings().withRole(randomFrom("master", "ingest", "ml"))
+            .withRandomBandwidths()
+            .withRandomMemory()
+            .build();
         assertThat(
             "Non-data nodes have a default 40mb rate limit",
-            nodeRecoverySettings().withRole(randomFrom("master", "ingest", "ml"))
-                .withRandomBandwidths()
-                .withRandomMemory()
-                .build()
-                .getMaxBytesPerSec(),
+            recoverySettings.getMaxBytesPerSec(),
             equalTo(DEFAULT_MAX_BYTES_PER_SEC)
         );
     }
@@ -211,17 +227,18 @@ public class RecoverySettingsTests extends ESTestCase {
             case 2 -> ByteSizeValue.ofGb(between(100, 1000));
             default -> throw new AssertionError();
         };
+        RecoverySettings recoverySettings = nodeRecoverySettings().withIndicesRecoveryMaxBytesPerSec(indicesRecoveryMaxBytesPerSec)
+            .withNetworkBandwidth(ByteSizeValue.ofGb(1))
+            .withDiskReadBandwidth(ByteSizeValue.ofMb(500))
+            .withDiskWriteBandwidth(ByteSizeValue.ofMb(250))
+            .withMaxOvercommitFactor(maxOvercommitFactor)
+            .withRoles(randomDataNodeRoles())
+            .withRandomMemory()
+            .build();
+        assertThat("Node bandwidth settings should all exist", recoverySettings.nodeBandwidthSettingsExist(), equalTo(true));
         assertThat(
             "Data nodes should not exceed the max. allowed overcommit when 'indices.recovery.max_bytes_per_sec' is too large",
-            nodeRecoverySettings().withIndicesRecoveryMaxBytesPerSec(indicesRecoveryMaxBytesPerSec)
-                .withNetworkBandwidth(ByteSizeValue.ofGb(1))
-                .withDiskReadBandwidth(ByteSizeValue.ofMb(500))
-                .withDiskWriteBandwidth(ByteSizeValue.ofMb(250))
-                .withMaxOvercommitFactor(maxOvercommitFactor)
-                .withRoles(randomDataNodeRoles())
-                .withRandomMemory()
-                .build()
-                .getMaxBytesPerSec(),
+            recoverySettings.getMaxBytesPerSec(),
             equalTo(
                 ByteSizeValue.ofBytes(
                     Math.round(Objects.requireNonNullElse(maxOvercommitFactor, 100.d) * ByteSizeValue.ofMb(250).getBytes())
@@ -231,77 +248,82 @@ public class RecoverySettingsTests extends ESTestCase {
     }
 
     public void testMaxBytesPerSecOnDataNodeWithAvailableBandwidths() {
+        RecoverySettings recoverySettings = nodeRecoverySettings().withRoles(randomDataNodeRoles())
+            .withRandomMemory()
+            .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10)))
+            .withDiskReadBandwidth(ByteSizeValue.ofMb(between(10, 50)))
+            .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(10, 50)))
+            .build();
+        assertThat("Node bandwidth settings should all exist", recoverySettings.nodeBandwidthSettingsExist(), equalTo(true));
         assertThat(
             "Data node should use pre 8.1.0 default because available bandwidths are lower",
-            nodeRecoverySettings().withRoles(randomDataNodeRoles())
-                .withRandomMemory()
-                .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10)))
-                .withDiskReadBandwidth(ByteSizeValue.ofMb(between(10, 50)))
-                .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(10, 50)))
-                .build()
-                .getMaxBytesPerSec(),
+            recoverySettings.getMaxBytesPerSec(),
             equalTo(DEFAULT_MAX_BYTES_PER_SEC)
         );
 
         final ByteSizeValue indicesRecoveryMaxBytesPerSec = ByteSizeValue.ofMb(randomFrom(100, 250));
+        recoverySettings = nodeRecoverySettings().withRoles(randomDataNodeRoles())
+            .withRandomMemory()
+            .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10)))
+            .withDiskReadBandwidth(ByteSizeValue.ofMb(between(10, 50)))
+            .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(10, 50)))
+            .withIndicesRecoveryMaxBytesPerSec(indicesRecoveryMaxBytesPerSec)
+            .build();
+        assertThat("Node bandwidth settings should all exist", recoverySettings.nodeBandwidthSettingsExist(), equalTo(true));
         assertThat(
             "Data node should use 'indices.recovery.max_bytes_per_sec' setting because available bandwidths are lower",
-            nodeRecoverySettings().withRoles(randomDataNodeRoles())
-                .withRandomMemory()
-                .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10)))
-                .withDiskReadBandwidth(ByteSizeValue.ofMb(between(10, 50)))
-                .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(10, 50)))
-                .withIndicesRecoveryMaxBytesPerSec(indicesRecoveryMaxBytesPerSec)
-                .build()
-                .getMaxBytesPerSec(),
+            recoverySettings.getMaxBytesPerSec(),
             equalTo(indicesRecoveryMaxBytesPerSec)
         );
 
         final Double factor = randomBoolean() ? randomDoubleBetween(0.5d, 1.0d, true) : null;
 
         final ByteSizeValue networkBandwidth = ByteSizeValue.ofMb(randomFrom(100, 250));
+        recoverySettings = nodeRecoverySettings().withRoles(randomDataNodeRoles())
+            .withRandomMemory()
+            .withNetworkBandwidth(networkBandwidth)
+            .withDiskReadBandwidth(ByteSizeValue.ofMb(between(250, 500)))
+            .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(250, 500)))
+            .withOperatorDefaultFactor(factor)
+            .build();
+        assertThat("Node bandwidth settings should all exist", recoverySettings.nodeBandwidthSettingsExist(), equalTo(true));
         assertThat(
             "Data node should use available disk read bandwidth",
-            nodeRecoverySettings().withRoles(randomDataNodeRoles())
-                .withRandomMemory()
-                .withNetworkBandwidth(networkBandwidth)
-                .withDiskReadBandwidth(ByteSizeValue.ofMb(between(250, 500)))
-                .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(250, 500)))
-                .withOperatorDefaultFactor(factor)
-                .build()
-                .getMaxBytesPerSec(),
+            recoverySettings.getMaxBytesPerSec(),
             equalTo(
                 ByteSizeValue.ofBytes(Math.round(Objects.requireNonNullElse(factor, DEFAULT_FACTOR_VALUE) * networkBandwidth.getBytes()))
             )
         );
 
         final ByteSizeValue diskReadBandwidth = ByteSizeValue.ofMb(randomFrom(100, 250));
+        recoverySettings = nodeRecoverySettings().withRoles(randomDataNodeRoles())
+            .withRandomMemory()
+            .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10)))
+            .withDiskReadBandwidth(diskReadBandwidth)
+            .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(250, 500)))
+            .withOperatorDefaultFactor(factor)
+            .build();
+        assertThat("Node bandwidth settings should all exist", recoverySettings.nodeBandwidthSettingsExist(), equalTo(true));
         assertThat(
             "Data node should use available disk read bandwidth",
-            nodeRecoverySettings().withRoles(randomDataNodeRoles())
-                .withRandomMemory()
-                .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10)))
-                .withDiskReadBandwidth(diskReadBandwidth)
-                .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(250, 500)))
-                .withOperatorDefaultFactor(factor)
-                .build()
-                .getMaxBytesPerSec(),
+            recoverySettings.getMaxBytesPerSec(),
             equalTo(
                 ByteSizeValue.ofBytes(Math.round(Objects.requireNonNullElse(factor, DEFAULT_FACTOR_VALUE) * diskReadBandwidth.getBytes()))
             )
         );
 
         final ByteSizeValue diskWriteBandwidth = ByteSizeValue.ofMb(randomFrom(100, 250));
+        recoverySettings = nodeRecoverySettings().withRoles(randomDataNodeRoles())
+            .withRandomMemory()
+            .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10)))
+            .withDiskReadBandwidth(ByteSizeValue.ofMb(between(250, 500)))
+            .withDiskWriteBandwidth(diskWriteBandwidth)
+            .withOperatorDefaultFactor(factor)
+            .build();
+        assertThat("Node bandwidth settings should all exist", recoverySettings.nodeBandwidthSettingsExist(), equalTo(true));
         assertThat(
             "Data node should use available disk write bandwidth",
-            nodeRecoverySettings().withRoles(randomDataNodeRoles())
-                .withRandomMemory()
-                .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10)))
-                .withDiskReadBandwidth(ByteSizeValue.ofMb(between(250, 500)))
-                .withDiskWriteBandwidth(diskWriteBandwidth)
-                .withOperatorDefaultFactor(factor)
-                .build()
-                .getMaxBytesPerSec(),
+            recoverySettings.getMaxBytesPerSec(),
             equalTo(
                 ByteSizeValue.ofBytes(Math.round(Objects.requireNonNullElse(factor, DEFAULT_FACTOR_VALUE) * diskWriteBandwidth.getBytes()))
             )