Browse Source

Improve searchable snapshot mount time (#66198)

Reduce the range sizes we fetch during mounting to speed up mount time
until shard started.
On resource constrained setups (rate limiter, disk or network), the time
to mount multiple shards is proportional to the amount of data to fetch
and for most files in a snapshot, we need to fetch only a small piece of
the files to start the shard.
Henning Andersen 4 years ago
parent
commit
6377c5e66f

+ 15 - 1
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java

@@ -374,7 +374,14 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
 
         final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length()));
         if (useCache && isExcludedFromCache(name) == false) {
-            return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats, cacheService.getRangeSize());
+            return new CachedBlobContainerIndexInput(
+                this,
+                fileInfo,
+                context,
+                inputStats,
+                cacheService.getRangeSize(),
+                cacheService.getRecoveryRangeSize()
+            );
         } else {
             return new DirectBlobContainerIndexInput(
                 blobContainer(),
@@ -400,6 +407,13 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
         return ext != null && excludedFileTypes.contains(ext);
     }
 
+    public boolean isRecoveryFinalized() {
+        SearchableSnapshotRecoveryState recoveryState = this.recoveryState;
+        if (recoveryState == null) return false;
+        RecoveryState.Stage stage = recoveryState.getStage();
+        return stage == RecoveryState.Stage.DONE || stage == RecoveryState.Stage.FINALIZE;
+    }
+
     @Override
     public String toString() {
         return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory + " shard=" + shardId;

+ 13 - 5
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java

@@ -62,6 +62,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
     private final SearchableSnapshotDirectory directory;
     private final CacheFileReference cacheFileReference;
     private final int defaultRangeSize;
+    private final int recoveryRangeSize;
 
     // last read position is kept around in order to detect (non)contiguous reads for stats
     private long lastReadPosition;
@@ -73,7 +74,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
         FileInfo fileInfo,
         IOContext context,
         IndexInputStats stats,
-        int rangeSize
+        int rangeSize,
+        int recoveryRangeSize
     ) {
         this(
             "CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")",
@@ -84,7 +86,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
             0L,
             fileInfo.length(),
             new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()),
-            rangeSize
+            rangeSize,
+            recoveryRangeSize
         );
         assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth
         stats.incrementOpenCount();
@@ -99,7 +102,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
         long offset,
         long length,
         CacheFileReference cacheFileReference,
-        int rangeSize
+        int rangeSize,
+        int recoveryRangeSize
     ) {
         super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length);
         this.directory = directory;
@@ -107,6 +111,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
         this.lastReadPosition = this.offset;
         this.lastSeekPosition = this.offset;
         this.defaultRangeSize = rangeSize;
+        this.recoveryRangeSize = recoveryRangeSize;
     }
 
     @Override
@@ -124,7 +129,9 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
     }
 
     private long getDefaultRangeSize() {
-        return (context != CACHE_WARMING_CONTEXT) ? defaultRangeSize : fileInfo.partSize().getBytes();
+        return (context != CACHE_WARMING_CONTEXT)
+            ? (directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize)
+            : fileInfo.partSize().getBytes();
     }
 
     private Tuple<Long, Long> computeRange(long position) {
@@ -729,7 +736,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
             this.offset + offset,
             length,
             cacheFileReference,
-            defaultRangeSize
+            defaultRangeSize,
+            recoveryRangeSize
         );
         slice.isClone = true;
         return slice;

+ 30 - 0
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java

@@ -73,6 +73,13 @@ public class CacheService extends AbstractLifecycleComponent {
 
     public static final ByteSizeValue MIN_SNAPSHOT_CACHE_RANGE_SIZE = new ByteSizeValue(4, ByteSizeUnit.KB);
     public static final ByteSizeValue MAX_SNAPSHOT_CACHE_RANGE_SIZE = new ByteSizeValue(Integer.MAX_VALUE, ByteSizeUnit.BYTES);
+
+    /**
+     * If a search needs data from the repository then we expand it to a larger contiguous range whose size is determined by this setting,
+     * in anticipation of needing nearby data in subsequent reads. Repository reads typically have quite high latency (think ~100ms) and
+     * the default of 32MB for this setting represents the approximate point at which size starts to matter. In other words, reads of
+     * ranges smaller than 32MB don't usually happen much quicker, so we may as well expand all the way to 32MB ranges.
+     */
     public static final Setting<ByteSizeValue> SNAPSHOT_CACHE_RANGE_SIZE_SETTING = Setting.byteSizeSetting(
         SETTINGS_PREFIX + "range_size",
         new ByteSizeValue(32, ByteSizeUnit.MB),                 // default
@@ -81,6 +88,20 @@ public class CacheService extends AbstractLifecycleComponent {
         Setting.Property.NodeScope
     );
 
+    /**
+     * Starting up a shard involves reading small parts of some files from the repository, independently of the pre-warming process. If we
+     * expand those ranges using {@link CacheService#SNAPSHOT_CACHE_RANGE_SIZE_SETTING} then we end up reading quite a few 32MB ranges. If
+     * we read enough of these ranges for the restore throttling rate limiter to kick in then all the read threads will end up waiting on
+     * the throttle, blocking subsequent reads. By using a smaller read size during restore we avoid clogging up the rate limiter so much.
+     */
+    public static final Setting<ByteSizeValue> SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING = Setting.byteSizeSetting(
+        SETTINGS_PREFIX + "recovery_range_size",
+        new ByteSizeValue(128, ByteSizeUnit.KB),                // default
+        MIN_SNAPSHOT_CACHE_RANGE_SIZE,                          // min
+        MAX_SNAPSHOT_CACHE_RANGE_SIZE,                          // max
+        Setting.Property.NodeScope
+    );
+
     public static final TimeValue MIN_SNAPSHOT_CACHE_SYNC_INTERVAL = TimeValue.timeValueSeconds(1L);
     public static final Setting<TimeValue> SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING = Setting.timeSetting(
         SETTINGS_PREFIX + "sync.interval",
@@ -118,6 +139,7 @@ public class CacheService extends AbstractLifecycleComponent {
     private final Cache<CacheKey, CacheFile> cache;
     private final ByteSizeValue cacheSize;
     private final ByteSizeValue rangeSize;
+    private final ByteSizeValue recoveryRangeSize;
     private final KeyedLock<ShardEviction> shardsEvictionLock;
     private final Set<ShardEviction> evictedShards;
 
@@ -132,6 +154,7 @@ public class CacheService extends AbstractLifecycleComponent {
         this.threadPool = Objects.requireNonNull(threadPool);
         this.cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings);
         this.rangeSize = SNAPSHOT_CACHE_RANGE_SIZE_SETTING.get(settings);
+        this.recoveryRangeSize = SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING.get(settings);
         this.cache = CacheBuilder.<CacheKey, CacheFile>builder()
             .setMaximumWeight(cacheSize.getBytes())
             .weigher((key, entry) -> entry.getLength())
@@ -227,6 +250,13 @@ public class CacheService extends AbstractLifecycleComponent {
         return toIntBytes(rangeSize.getBytes());
     }
 
+    /**
+     * @return the cache range size (in bytes) to use during recovery (until post_recovery)
+     */
+    public int getRecoveryRangeSize() {
+        return toIntBytes(recoveryRangeSize.getBytes());
+    }
+
     /**
      * Retrieves the {@link CacheFile} instance associated with the specified {@link CacheKey} in the cache. If the key is not already
      * associated with a {@link CacheFile}, this method creates a new instance using the given file length and cache directory.

+ 6 - 6
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java

@@ -461,7 +461,7 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot
         final boolean prewarmCache,
         final CheckedBiConsumer<Directory, SearchableSnapshotDirectory, Exception> consumer
     ) throws Exception {
-        testDirectories(enableCache, prewarmCache, createRecoveryState(), Settings.EMPTY, consumer);
+        testDirectories(enableCache, prewarmCache, createRecoveryState(randomBoolean()), Settings.EMPTY, consumer);
     }
 
     private void testDirectories(
@@ -710,7 +710,7 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot
                     threadPool
                 )
             ) {
-                final RecoveryState recoveryState = createRecoveryState();
+                final RecoveryState recoveryState = createRecoveryState(randomBoolean());
                 final PlainActionFuture<Void> f = PlainActionFuture.newFuture();
                 final boolean loaded = directory.loadSnapshot(recoveryState, f);
                 f.get();
@@ -779,7 +779,7 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot
         PathUtilsForTesting.installMock(fileSystem);
 
         try {
-            SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
+            SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);
             testDirectories(true, true, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> {
                 boolean areAllFilesReused = snapshotDirectory.snapshot()
                     .indexFiles()
@@ -800,7 +800,7 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot
     }
 
     public void testRecoveryStateIsEmptyWhenTheCacheIsNotPreWarmed() throws Exception {
-        SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
+        SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);
         testDirectories(true, false, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> {
             assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE));
             assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L));
@@ -809,7 +809,7 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot
     }
 
     public void testNonCachedFilesAreExcludedFromRecoveryState() throws Exception {
-        SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
+        SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);
 
         List<String> allFileExtensions = List.of(
             "fdt",
@@ -845,7 +845,7 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot
     }
 
     public void testFilesWithHashEqualsContentsAreMarkedAsReusedOnRecoveryState() throws Exception {
-        SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
+        SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);
 
         testDirectories(true, true, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> {
             assertBusy(() -> assertTrue(recoveryState.isPreWarmComplete()));

+ 10 - 9
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java

@@ -83,11 +83,12 @@ public class CachedBlobContainerIndexInputTests extends AbstractSearchableSnapsh
                 final BlobContainer singleBlobContainer = singleSplitBlobContainer(blobName, input, partSize);
                 final BlobContainer blobContainer;
                 if (input.length == partSize && input.length <= cacheService.getCacheSize() && prewarmEnabled == false) {
-                    blobContainer = new CountingBlobContainer(singleBlobContainer, cacheService.getRangeSize());
+                    blobContainer = new CountingBlobContainer(singleBlobContainer);
                 } else {
                     blobContainer = singleBlobContainer;
                 }
 
+                final boolean recoveryFinalizedDone = randomBoolean();
                 final Path shardDir;
                 try {
                     shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId);
@@ -116,7 +117,7 @@ public class CachedBlobContainerIndexInputTests extends AbstractSearchableSnapsh
                         threadPool
                     )
                 ) {
-                    RecoveryState recoveryState = createRecoveryState();
+                    RecoveryState recoveryState = createRecoveryState(recoveryFinalizedDone);
                     final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
                     final boolean loaded = directory.loadSnapshot(recoveryState, future);
                     if (randomBoolean()) {
@@ -136,7 +137,10 @@ public class CachedBlobContainerIndexInputTests extends AbstractSearchableSnapsh
                 }
 
                 if (blobContainer instanceof CountingBlobContainer) {
-                    long numberOfRanges = TestUtils.numberOfRanges(input.length, cacheService.getRangeSize());
+                    long numberOfRanges = TestUtils.numberOfRanges(
+                        input.length,
+                        recoveryFinalizedDone ? cacheService.getRangeSize() : cacheService.getRecoveryRangeSize()
+                    );
                     assertThat(
                         "Expected at most " + numberOfRanges + " ranges fetched from the source",
                         ((CountingBlobContainer) blobContainer).totalOpens.sum(),
@@ -211,7 +215,7 @@ public class CachedBlobContainerIndexInputTests extends AbstractSearchableSnapsh
                     threadPool
                 )
             ) {
-                RecoveryState recoveryState = createRecoveryState();
+                RecoveryState recoveryState = createRecoveryState(randomBoolean());
                 final PlainActionFuture<Void> f = PlainActionFuture.newFuture();
                 final boolean loaded = searchableSnapshotDirectory.loadSnapshot(recoveryState, f);
                 try {
@@ -262,11 +266,8 @@ public class CachedBlobContainerIndexInputTests extends AbstractSearchableSnapsh
 
         private final AtomicInteger openStreams = new AtomicInteger(0);
 
-        private final int rangeSize;
-
-        CountingBlobContainer(BlobContainer in, int rangeSize) {
+        CountingBlobContainer(BlobContainer in) {
             super(in);
-            this.rangeSize = rangeSize;
         }
 
         @Override
@@ -276,7 +277,7 @@ public class CachedBlobContainerIndexInputTests extends AbstractSearchableSnapsh
 
         @Override
         protected BlobContainer wrapChild(BlobContainer child) {
-            return new CountingBlobContainer(child, this.rangeSize);
+            return new CountingBlobContainer(child);
         }
 
         @Override

+ 7 - 3
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java

@@ -101,6 +101,9 @@ public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTe
         if (randomBoolean()) {
             cacheSettings.put(CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), randomCacheRangeSize());
         }
+        if (randomBoolean()) {
+            cacheSettings.put(CacheService.SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING.getKey(), randomCacheRangeSize());
+        }
         if (randomBoolean()) {
             cacheSettings.put(
                 CacheService.SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING.getKey(),
@@ -149,7 +152,7 @@ public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTe
         );
     }
 
-    protected static SearchableSnapshotRecoveryState createRecoveryState() {
+    protected static SearchableSnapshotRecoveryState createRecoveryState(boolean finalizedDone) {
         ShardRouting shardRouting = TestShardRouting.newShardRouting(
             new ShardId(randomAlphaOfLength(10), randomAlphaOfLength(10), 0),
             randomAlphaOfLength(10),
@@ -170,8 +173,9 @@ public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTe
             .setStage(RecoveryState.Stage.VERIFY_INDEX)
             .setStage(RecoveryState.Stage.TRANSLOG);
         recoveryState.getIndex().setFileDetailsComplete();
-        recoveryState.setStage(RecoveryState.Stage.FINALIZE).setStage(RecoveryState.Stage.DONE);
-
+        if (finalizedDone) {
+            recoveryState.setStage(RecoveryState.Stage.FINALIZE).setStage(RecoveryState.Stage.DONE);
+        }
         return recoveryState;
     }