Browse Source

Make Ccr recovery file chunk size configurable (#38370)

This commit adds a byte setting `ccr.indices.recovery.chunk_size`. This
setting configs the size of file chunk requested while recovering from
remote.
Tim Brooks 6 years ago
parent
commit
4a15e2b29e

+ 20 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java

@@ -49,6 +49,14 @@ public final class CcrSettings {
         Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
         Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
             Setting.Property.Dynamic, Setting.Property.NodeScope);
             Setting.Property.Dynamic, Setting.Property.NodeScope);
 
 
+    /**
+     * File chunk size to send during recovery
+     */
+    public static final Setting<ByteSizeValue> RECOVERY_CHUNK_SIZE =
+        Setting.byteSizeSetting("ccr.indices.recovery.chunk_size", new ByteSizeValue(1, ByteSizeUnit.MB),
+            new ByteSizeValue(1, ByteSizeUnit.KB), new ByteSizeValue(1, ByteSizeUnit.GB), Setting.Property.Dynamic,
+            Setting.Property.NodeScope);
+
     /**
     /**
      * The leader must open resources for a ccr recovery. If there is no activity for this interval of time,
      * The leader must open resources for a ccr recovery. If there is no activity for this interval of time,
      * the leader will close the restore session.
      * the leader will close the restore session.
@@ -77,22 +85,30 @@ public final class CcrSettings {
                 INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
                 INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
                 INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
                 INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
                 CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
                 CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
+                RECOVERY_CHUNK_SIZE,
                 CCR_WAIT_FOR_METADATA_TIMEOUT);
                 CCR_WAIT_FOR_METADATA_TIMEOUT);
     }
     }
 
 
     private final CombinedRateLimiter ccrRateLimiter;
     private final CombinedRateLimiter ccrRateLimiter;
     private volatile TimeValue recoveryActivityTimeout;
     private volatile TimeValue recoveryActivityTimeout;
     private volatile TimeValue recoveryActionTimeout;
     private volatile TimeValue recoveryActionTimeout;
+    private volatile ByteSizeValue chunkSize;
 
 
     public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
     public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
         this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
         this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
         this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings);
         this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings);
         this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
         this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
+        this.chunkSize = RECOVERY_MAX_BYTES_PER_SECOND.get(settings);
         clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
         clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
+        clusterSettings.addSettingsUpdateConsumer(RECOVERY_CHUNK_SIZE, this::setChunkSize);
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout);
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout);
     }
     }
 
 
+    private void setChunkSize(ByteSizeValue chunkSize) {
+        this.chunkSize = chunkSize;
+    }
+
     private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
     private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
         ccrRateLimiter.setMBPerSec(maxBytesPerSec);
         ccrRateLimiter.setMBPerSec(maxBytesPerSec);
     }
     }
@@ -105,6 +121,10 @@ public final class CcrSettings {
         this.recoveryActionTimeout = recoveryActionTimeout;
         this.recoveryActionTimeout = recoveryActionTimeout;
     }
     }
 
 
+    public ByteSizeValue getChunkSize() {
+        return chunkSize;
+    }
+
     public CombinedRateLimiter getRateLimiter() {
     public CombinedRateLimiter getRateLimiter() {
         return ccrRateLimiter;
         return ccrRateLimiter;
     }
     }

+ 1 - 3
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -329,8 +329,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
 
 
     private static class RestoreSession extends FileRestoreContext implements Closeable {
     private static class RestoreSession extends FileRestoreContext implements Closeable {
 
 
-        private static final int BUFFER_SIZE = 1 << 16;
-
         private final Client remoteClient;
         private final Client remoteClient;
         private final String sessionUUID;
         private final String sessionUUID;
         private final DiscoveryNode node;
         private final DiscoveryNode node;
@@ -342,7 +340,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
         RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
                        RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion,
                        RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion,
                        CcrSettings ccrSettings, LongConsumer throttleListener) {
                        CcrSettings ccrSettings, LongConsumer throttleListener) {
-            super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
+            super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes()));
             this.remoteClient = remoteClient;
             this.remoteClient = remoteClient;
             this.sessionUUID = sessionUUID;
             this.sessionUUID = sessionUUID;
             this.node = node;
             this.node = node;

+ 10 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

@@ -169,8 +169,12 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
         assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID());
         assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID());
     }
     }
 
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38100")
     public void testDocsAreRecovered() throws Exception {
     public void testDocsAreRecovered() throws Exception {
+        ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
+        String chunkSize = randomFrom("4KB", "128KB", "1MB");
+        settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize));
+        assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
+
         String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
         String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
         String leaderIndex = "index1";
         String leaderIndex = "index1";
         String followerIndex = "index2";
         String followerIndex = "index2";
@@ -243,6 +247,11 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
 
 
         isRunning.set(false);
         isRunning.set(false);
         thread.join();
         thread.join();
+
+        settingsRequest = new ClusterUpdateSettingsRequest();
+        ByteSizeValue defaultValue = CcrSettings.RECOVERY_CHUNK_SIZE.getDefault(Settings.EMPTY);
+        settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), defaultValue));
+        assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
     }
     }
 
 
     public void testRateLimitingIsEmployed() throws Exception {
     public void testRateLimitingIsEmployed() throws Exception {

+ 2 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java

@@ -40,7 +40,8 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
         Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
         Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
         taskQueue = new DeterministicTaskQueue(settings, random());
         taskQueue = new DeterministicTaskQueue(settings, random());
         Set<Setting<?>> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
         Set<Setting<?>> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
-            CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING);
+            CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
+            CcrSettings.RECOVERY_CHUNK_SIZE);
         ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings);
         ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings);
         restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings));
         restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings));
     }
     }