Browse Source

Core: increase default rate limiting for snapshot, restore and recovery to 40 MB/sec

This also fixes a possible issue that may cause over-throttling when
there are many small files being copied, which should be rare.

Closed #10185
Michael McCandless 10 years ago
parent
commit
34b397597c

+ 1 - 1
docs/reference/modules/indices.asciidoc

@@ -59,5 +59,5 @@ The following settings can be set to manage the recovery policy:
     defaults to `true`.
     defaults to `true`.
 
 
 `indices.recovery.max_bytes_per_sec`::
 `indices.recovery.max_bytes_per_sec`::
-    defaults to `20mb`.
+    defaults to `40mb`.
 
 

+ 2 - 2
docs/reference/modules/snapshots.asciidoc

@@ -69,8 +69,8 @@ on all data and master nodes. The following settings are supported:
 `compress`:: Turns on compression of the snapshot files. Compression is applied only to metadata files (index mapping and settings). Data files are not compressed. Defaults to `true`.
 `compress`:: Turns on compression of the snapshot files. Compression is applied only to metadata files (index mapping and settings). Data files are not compressed. Defaults to `true`.
 `chunk_size`:: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by
 `chunk_size`:: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by
  using size value notation, i.e. 1g, 10m, 5k. Defaults to `null` (unlimited chunk size).
  using size value notation, i.e. 1g, 10m, 5k. Defaults to `null` (unlimited chunk size).
-`max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to `20mb` per second.
-`max_snapshot_bytes_per_sec`:: Throttles per node snapshot rate. Defaults to `20mb` per second.
+`max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to `40mb` per second.
+`max_snapshot_bytes_per_sec`:: Throttles per node snapshot rate. Defaults to `40mb` per second.
 `verify`:: Verify repository upon creation. Defaults to `true`.
 `verify`:: Verify repository upon creation. Defaults to `true`.
 
 
 [float]
 [float]

+ 1 - 1
src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java

@@ -129,7 +129,7 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
         this.concurrentSmallFileStreams = settings.getAsInt("indices.recovery.concurrent_small_file_streams", settings.getAsInt("index.shard.recovery.concurrent_small_file_streams", 2));
         this.concurrentSmallFileStreams = settings.getAsInt("indices.recovery.concurrent_small_file_streams", settings.getAsInt("index.shard.recovery.concurrent_small_file_streams", 2));
         this.concurrentSmallFileStreamPool = EsExecutors.newScaling(0, concurrentSmallFileStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]"));
         this.concurrentSmallFileStreamPool = EsExecutors.newScaling(0, concurrentSmallFileStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]"));
 
 
-        this.maxBytesPerSec = settings.getAsBytesSize("indices.recovery.max_bytes_per_sec", settings.getAsBytesSize("indices.recovery.max_size_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB)));
+        this.maxBytesPerSec = settings.getAsBytesSize("indices.recovery.max_bytes_per_sec", settings.getAsBytesSize("indices.recovery.max_size_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)));
         if (maxBytesPerSec.bytes() <= 0) {
         if (maxBytesPerSec.bytes() <= 0) {
             rateLimiter = null;
             rateLimiter = null;
         } else {
         } else {

+ 15 - 3
src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RateLimiter;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.ArrayUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ExceptionsHelper;
@@ -67,6 +68,7 @@ import java.io.IOException;
 import java.util.Comparator;
 import java.util.Comparator;
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReference;
 
 
 /**
 /**
@@ -214,6 +216,10 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
             final AtomicReference<Throwable> corruptedEngine = new AtomicReference<>();
             final AtomicReference<Throwable> corruptedEngine = new AtomicReference<>();
             int fileIndex = 0;
             int fileIndex = 0;
             ThreadPoolExecutor pool;
             ThreadPoolExecutor pool;
+
+            // How many bytes we've copied since we last called RateLimiter.pause
+            final AtomicLong bytesSinceLastPause = new AtomicLong();
+
             for (final String name : response.phase1FileNames) {
             for (final String name : response.phase1FileNames) {
                 long fileSize = response.phase1FileSizes.get(fileIndex);
                 long fileSize = response.phase1FileSizes.get(fileIndex);
 
 
@@ -276,11 +282,17 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
                                 final long position = indexInput.getFilePointer();
                                 final long position = indexInput.getFilePointer();
 
 
                                 // Pause using the rate limiter, if desired, to throttle the recovery
                                 // Pause using the rate limiter, if desired, to throttle the recovery
+                                RateLimiter rl = recoverySettings.rateLimiter();
                                 long throttleTimeInNanos = 0;
                                 long throttleTimeInNanos = 0;
-                                if (recoverySettings.rateLimiter() != null) {
-                                    throttleTimeInNanos = recoverySettings.rateLimiter().pause(toRead);
+                                if (rl != null) {
+                                    long bytes = bytesSinceLastPause.addAndGet(toRead);
+                                    if (bytes > rl.getMinPauseCheckBytes()) {
+                                        // Time to pause
+                                        bytesSinceLastPause.addAndGet(-bytes);
+                                        throttleTimeInNanos = rl.pause(bytes);
+                                        shard.recoveryStats().addThrottleTime(throttleTimeInNanos);
+                                    }
                                 }
                                 }
-                                shard.recoveryStats().addThrottleTime(throttleTimeInNanos);
                                 indexInput.readBytes(buf, 0, toRead, false);
                                 indexInput.readBytes(buf, 0, toRead, false);
                                 final BytesArray content = new BytesArray(buf, 0, toRead);
                                 final BytesArray content = new BytesArray(buf, 0, toRead);
                                 readCount += toRead;
                                 readCount += toRead;

+ 16 - 5
src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -24,6 +24,7 @@ import org.apache.lucene.index.IndexFormatTooNewException;
 import org.apache.lucene.index.IndexFormatTooOldException;
 import org.apache.lucene.index.IndexFormatTooOldException;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RateLimiter;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterService;
@@ -54,6 +55,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReference;
 
 
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@@ -417,6 +419,9 @@ public class RecoveryTarget extends AbstractComponent {
 
 
     class FileChunkTransportRequestHandler extends BaseTransportRequestHandler<RecoveryFileChunkRequest> {
     class FileChunkTransportRequestHandler extends BaseTransportRequestHandler<RecoveryFileChunkRequest> {
 
 
+        // How many bytes we've copied since we last called RateLimiter.pause
+        final AtomicLong bytesSinceLastPause = new AtomicLong();
+
         @Override
         @Override
         public RecoveryFileChunkRequest newInstance() {
         public RecoveryFileChunkRequest newInstance() {
             return new RecoveryFileChunkRequest();
             return new RecoveryFileChunkRequest();
@@ -443,15 +448,21 @@ public class RecoveryTarget extends AbstractComponent {
                 } else {
                 } else {
                     indexOutput = recoveryStatus.getOpenIndexOutput(request.name());
                     indexOutput = recoveryStatus.getOpenIndexOutput(request.name());
                 }
                 }
-                if (recoverySettings.rateLimiter() != null) {
-                    long targetThrottling = recoverySettings.rateLimiter().pause(request.content().length());
-                    indexState.addTargetThrottling(targetThrottling);
-                    recoveryStatus.indexShard().recoveryStats().addThrottleTime(targetThrottling);
-                }
                 BytesReference content = request.content();
                 BytesReference content = request.content();
                 if (!content.hasArray()) {
                 if (!content.hasArray()) {
                     content = content.toBytesArray();
                     content = content.toBytesArray();
                 }
                 }
+                RateLimiter rl = recoverySettings.rateLimiter();
+                if (rl != null) {
+                    long bytes = bytesSinceLastPause.addAndGet(content.length());
+                    if (bytes > rl.getMinPauseCheckBytes()) {
+                        // Time to pause
+                        bytesSinceLastPause.addAndGet(-bytes);
+                        long throttleTimeInNanos = rl.pause(bytes);
+                        indexState.addTargetThrottling(throttleTimeInNanos);
+                        recoveryStatus.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
+                    }
+                }
                 indexOutput.writeBytes(content.array(), content.arrayOffset(), content.length());
                 indexOutput.writeBytes(content.array(), content.arrayOffset(), content.length());
                 indexState.addRecoveredBytesToFile(request.name(), content.length());
                 indexState.addRecoveredBytesToFile(request.name(), content.length());
                 if (indexOutput.getFilePointer() >= request.length() || request.lastChunk()) {
                 if (indexOutput.getFilePointer() >= request.length() || request.lastChunk()) {

+ 2 - 2
src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -151,8 +151,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
         Map<String, String> snpashotOnlyParams = Maps.newHashMap();
         Map<String, String> snpashotOnlyParams = Maps.newHashMap();
         snpashotOnlyParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_SNAPSHOT);
         snpashotOnlyParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_SNAPSHOT);
         snapshotOnlyFormatParams = new ToXContent.MapParams(snpashotOnlyParams);
         snapshotOnlyFormatParams = new ToXContent.MapParams(snpashotOnlyParams);
-        snapshotRateLimiter = getRateLimiter(repositorySettings, "max_snapshot_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB));
-        restoreRateLimiter = getRateLimiter(repositorySettings, "max_restore_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB));
+        snapshotRateLimiter = getRateLimiter(repositorySettings, "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
+        restoreRateLimiter = getRateLimiter(repositorySettings, "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
     }
     }
 
 
     /**
     /**