Browse Source

Allow to prewarm the cache for searchable snapshot shards (#55322)

Relates #50999
Tanguy Leroux 5 years ago
parent
commit
bd40d06648
13 changed files with 596 additions and 51 deletions
  1. 2 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java
  2. 4 0
      x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java
  3. 99 5
      x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java
  4. 14 22
      x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java
  5. 97 10
      x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java
  6. 32 1
      x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java
  7. 1 1
      x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java
  8. 7 1
      x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java
  9. 18 5
      x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java
  10. 4 4
      x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java
  11. 311 0
      x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachePreWarmingTests.java
  12. 4 2
      x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java
  13. 3 0
      x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java

@@ -34,4 +34,6 @@ public class SearchableSnapshotsConstants {
         return SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED
         return SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED
             && SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings));
             && SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings));
     }
     }
+
+    public static final String SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME = "searchable_snapshots";
 }
 }

+ 4 - 0
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java

@@ -12,6 +12,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
 import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
 import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
 import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
@@ -136,6 +137,9 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu
             || threadName.contains('[' + ThreadPool.Names.SEARCH + ']')
             || threadName.contains('[' + ThreadPool.Names.SEARCH + ']')
             || threadName.contains('[' + ThreadPool.Names.SEARCH_THROTTLED + ']')
             || threadName.contains('[' + ThreadPool.Names.SEARCH_THROTTLED + ']')
 
 
+            // Cache prewarming runs on a dedicated thread pool.
+            || threadName.contains('[' + SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME + ']')
+
             // Today processExistingRecoveries considers all shards and constructs a shard store snapshot on this thread, this needs
             // Today processExistingRecoveries considers all shards and constructs a shard store snapshot on this thread, this needs
             // addressing. TODO NORELEASE
             // addressing. TODO NORELEASE
             || threadName.contains('[' + ThreadPool.Names.FETCH_SHARD_STORE + ']')
             || threadName.contains('[' + ThreadPool.Names.FETCH_SHARD_STORE + ']')

+ 99 - 5
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java

@@ -5,6 +5,9 @@
  */
  */
 package org.elasticsearch.index.store;
 package org.elasticsearch.index.store;
 
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.store.BaseDirectory;
 import org.apache.lucene.store.BaseDirectory;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.Directory;
@@ -18,8 +21,12 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
 import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.LazyInitializable;
 import org.elasticsearch.common.util.LazyInitializable;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.CountDown;
+import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardPath;
 import org.elasticsearch.index.shard.ShardPath;
@@ -47,18 +54,22 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Objects;
 import java.util.Set;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.LongSupplier;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 
 import static org.apache.lucene.store.BufferedIndexInput.bufferSize;
 import static org.apache.lucene.store.BufferedIndexInput.bufferSize;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING;
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME;
 
 
 /**
 /**
  * Implementation of {@link Directory} that exposes files from a snapshot as a Lucene directory. Because snapshot are immutable this
  * Implementation of {@link Directory} that exposes files from a snapshot as a Lucene directory. Because snapshot are immutable this
@@ -73,6 +84,8 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SN
  */
  */
 public class SearchableSnapshotDirectory extends BaseDirectory {
 public class SearchableSnapshotDirectory extends BaseDirectory {
 
 
+    private static final Logger logger = LogManager.getLogger(SearchableSnapshotDirectory.class);
+
     private final Supplier<BlobContainer> blobContainerSupplier;
     private final Supplier<BlobContainer> blobContainerSupplier;
     private final Supplier<BlobStoreIndexShardSnapshot> snapshotSupplier;
     private final Supplier<BlobStoreIndexShardSnapshot> snapshotSupplier;
     private final SnapshotId snapshotId;
     private final SnapshotId snapshotId;
@@ -80,8 +93,10 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
     private final ShardId shardId;
     private final ShardId shardId;
     private final LongSupplier statsCurrentTimeNanosSupplier;
     private final LongSupplier statsCurrentTimeNanosSupplier;
     private final Map<String, IndexInputStats> stats;
     private final Map<String, IndexInputStats> stats;
+    private final ThreadPool threadPool;
     private final CacheService cacheService;
     private final CacheService cacheService;
     private final boolean useCache;
     private final boolean useCache;
+    private final boolean prewarmCache;
     private final Set<String> excludedFileTypes;
     private final Set<String> excludedFileTypes;
     private final long uncachedChunkSize; // if negative use BlobContainer#readBlobPreferredLength, see #getUncachedChunkSize()
     private final long uncachedChunkSize; // if negative use BlobContainer#readBlobPreferredLength, see #getUncachedChunkSize()
     private final Path cacheDir;
     private final Path cacheDir;
@@ -101,7 +116,8 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
         Settings indexSettings,
         Settings indexSettings,
         LongSupplier currentTimeNanosSupplier,
         LongSupplier currentTimeNanosSupplier,
         CacheService cacheService,
         CacheService cacheService,
-        Path cacheDir
+        Path cacheDir,
+        ThreadPool threadPool
     ) {
     ) {
         super(new SingleInstanceLockFactory());
         super(new SingleInstanceLockFactory());
         this.snapshotSupplier = Objects.requireNonNull(snapshot);
         this.snapshotSupplier = Objects.requireNonNull(snapshot);
@@ -115,8 +131,10 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
         this.cacheDir = Objects.requireNonNull(cacheDir);
         this.cacheDir = Objects.requireNonNull(cacheDir);
         this.closed = new AtomicBoolean(false);
         this.closed = new AtomicBoolean(false);
         this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings);
         this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings);
+        this.prewarmCache = useCache ? SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.get(indexSettings) : false;
         this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings));
         this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings));
         this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes();
         this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes();
+        this.threadPool = threadPool;
         this.loaded = false;
         this.loaded = false;
         assert invariant();
         assert invariant();
     }
     }
@@ -142,6 +160,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
      * @return true if the snapshot was loaded by executing this method, false otherwise
      * @return true if the snapshot was loaded by executing this method, false otherwise
      */
      */
     public boolean loadSnapshot() {
     public boolean loadSnapshot() {
+        assert assertCurrentThreadMayLoadSnapshot();
         boolean alreadyLoaded = this.loaded;
         boolean alreadyLoaded = this.loaded;
         if (alreadyLoaded == false) {
         if (alreadyLoaded == false) {
             synchronized (this) {
             synchronized (this) {
@@ -150,10 +169,10 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
                     this.blobContainer = blobContainerSupplier.get();
                     this.blobContainer = blobContainerSupplier.get();
                     this.snapshot = snapshotSupplier.get();
                     this.snapshot = snapshotSupplier.get();
                     this.loaded = true;
                     this.loaded = true;
+                    prewarmCache();
                 }
                 }
             }
             }
         }
         }
-        assert assertCurrentThreadMayLoadSnapshot();
         assert invariant();
         assert invariant();
         return alreadyLoaded == false;
         return alreadyLoaded == false;
     }
     }
@@ -300,7 +319,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
 
 
         final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length()));
         final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length()));
         if (useCache && isExcludedFromCache(name) == false) {
         if (useCache && isExcludedFromCache(name) == false) {
-            return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats);
+            return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats, cacheService.getRangeSize());
         } else {
         } else {
             return new DirectBlobContainerIndexInput(
             return new DirectBlobContainerIndexInput(
                 blobContainer(),
                 blobContainer(),
@@ -331,12 +350,86 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
         return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory;
         return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory;
     }
     }
 
 
+    private void prewarmCache() {
+        if (prewarmCache) {
+            final List<BlobStoreIndexShardSnapshot.FileInfo> cacheFiles = snapshot().indexFiles()
+                .stream()
+                .filter(file -> file.metadata().hashEqualsContents() == false)
+                .filter(file -> isExcludedFromCache(file.physicalName()) == false)
+                .collect(Collectors.toList());
+
+            final Executor executor = threadPool.executor(SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME);
+            logger.debug("{} warming shard cache for [{}] files", shardId, cacheFiles.size());
+
+            for (BlobStoreIndexShardSnapshot.FileInfo cacheFile : cacheFiles) {
+                final String fileName = cacheFile.physicalName();
+                try {
+                    final IndexInput input = openInput(fileName, CachedBlobContainerIndexInput.CACHE_WARMING_CONTEXT);
+                    assert input instanceof CachedBlobContainerIndexInput : "expected cached index input but got " + input.getClass();
+
+                    final long numberOfParts = cacheFile.numberOfParts();
+                    final CountDown countDown = new CountDown(Math.toIntExact(numberOfParts));
+                    for (long p = 0; p < numberOfParts; p++) {
+                        final int part = Math.toIntExact(p);
+                        // TODO use multiple workers to warm each part instead of filling the thread pool
+                        executor.execute(new AbstractRunnable() {
+                            @Override
+                            protected void doRun() throws Exception {
+                                ensureOpen();
+
+                                logger.trace("warming cache for [{}] part [{}/{}]", fileName, part, numberOfParts);
+                                final long startTimeInNanos = statsCurrentTimeNanosSupplier.getAsLong();
+
+                                final CachedBlobContainerIndexInput cachedIndexInput = (CachedBlobContainerIndexInput) input.clone();
+                                final int bytesRead = cachedIndexInput.prefetchPart(part); // TODO does not include any rate limitation
+                                assert bytesRead == cacheFile.partBytes(part);
+
+                                logger.trace(
+                                    () -> new ParameterizedMessage(
+                                        "part [{}/{}] of [{}] warmed in [{}] ms",
+                                        part,
+                                        numberOfParts,
+                                        fileName,
+                                        TimeValue.timeValueNanos(statsCurrentTimeNanosSupplier.getAsLong() - startTimeInNanos).millis()
+                                    )
+                                );
+                            }
+
+                            @Override
+                            public void onFailure(Exception e) {
+                                logger.trace(
+                                    () -> new ParameterizedMessage(
+                                        "failed to warm cache for [{}] part [{}/{}]",
+                                        fileName,
+                                        part,
+                                        numberOfParts
+                                    ),
+                                    e
+                                );
+                            }
+
+                            @Override
+                            public void onAfter() {
+                                if (countDown.countDown()) {
+                                    IOUtils.closeWhileHandlingException(input);
+                                }
+                            }
+                        });
+                    }
+                } catch (IOException e) {
+                    logger.trace(() -> new ParameterizedMessage("failed to warm cache for [{}]", fileName), e);
+                }
+            }
+        }
+    }
+
     public static Directory create(
     public static Directory create(
         RepositoriesService repositories,
         RepositoriesService repositories,
         CacheService cache,
         CacheService cache,
         IndexSettings indexSettings,
         IndexSettings indexSettings,
         ShardPath shardPath,
         ShardPath shardPath,
-        LongSupplier currentTimeNanosSupplier
+        LongSupplier currentTimeNanosSupplier,
+        ThreadPool threadPool
     ) throws IOException {
     ) throws IOException {
 
 
         final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings()));
         final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings()));
@@ -371,7 +464,8 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
                 indexSettings.getSettings(),
                 indexSettings.getSettings(),
                 currentTimeNanosSupplier,
                 currentTimeNanosSupplier,
                 cache,
                 cache,
-                cacheDir
+                cacheDir,
+                threadPool
             )
             )
         );
         );
     }
     }

+ 14 - 22
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java

@@ -51,7 +51,6 @@ public class CacheFile {
     private final ReleasableLock readLock;
     private final ReleasableLock readLock;
 
 
     private final SparseFileTracker tracker;
     private final SparseFileTracker tracker;
-    private final int rangeSize;
     private final String description;
     private final String description;
     private final Path file;
     private final Path file;
 
 
@@ -61,12 +60,11 @@ public class CacheFile {
     @Nullable // if evicted, or there are no listeners
     @Nullable // if evicted, or there are no listeners
     private volatile FileChannel channel;
     private volatile FileChannel channel;
 
 
-    public CacheFile(String description, long length, Path file, int rangeSize) {
+    public CacheFile(String description, long length, Path file) {
         this.tracker = new SparseFileTracker(file.toString(), length);
         this.tracker = new SparseFileTracker(file.toString(), length);
         this.description = Objects.requireNonNull(description);
         this.description = Objects.requireNonNull(description);
         this.file = Objects.requireNonNull(file);
         this.file = Objects.requireNonNull(file);
         this.listeners = new HashSet<>();
         this.listeners = new HashSet<>();
-        this.rangeSize = rangeSize;
         this.evicted = false;
         this.evicted = false;
 
 
         final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
         final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
@@ -249,41 +247,35 @@ public class CacheFile {
     }
     }
 
 
     CompletableFuture<Integer> fetchRange(
     CompletableFuture<Integer> fetchRange(
-        long position,
+        long start,
+        long end,
         CheckedBiFunction<Long, Long, Integer, IOException> onRangeAvailable,
         CheckedBiFunction<Long, Long, Integer, IOException> onRangeAvailable,
         CheckedBiConsumer<Long, Long, IOException> onRangeMissing
         CheckedBiConsumer<Long, Long, IOException> onRangeMissing
     ) {
     ) {
         final CompletableFuture<Integer> future = new CompletableFuture<>();
         final CompletableFuture<Integer> future = new CompletableFuture<>();
         try {
         try {
-            if (position < 0 || position > tracker.getLength()) {
-                throw new IllegalArgumentException("Wrong read position [" + position + "]");
+            if (start < 0 || start > tracker.getLength() || start > end || end > tracker.getLength()) {
+                throw new IllegalArgumentException(
+                    "Invalid range [start=" + start + ", end=" + end + "] for length [" + tracker.getLength() + ']'
+                );
             }
             }
-
             ensureOpen();
             ensureOpen();
-            final long rangeStart = (position / rangeSize) * rangeSize;
-            final long rangeEnd = Math.min(rangeStart + rangeSize, tracker.getLength());
-
             final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
             final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
-                rangeStart,
-                rangeEnd,
+                start,
+                end,
                 ActionListener.wrap(
                 ActionListener.wrap(
-                    rangeReady -> future.complete(onRangeAvailable.apply(rangeStart, rangeEnd)),
+                    rangeReady -> future.complete(onRangeAvailable.apply(start, end)),
                     rangeFailure -> future.completeExceptionally(rangeFailure)
                     rangeFailure -> future.completeExceptionally(rangeFailure)
                 )
                 )
             );
             );
 
 
-            if (gaps.size() > 0) {
-                final SparseFileTracker.Gap range = gaps.get(0);
-                assert gaps.size() == 1 : "expected 1 range to fetch but got " + gaps.size();
-                assert range.start == rangeStart : "range/gap start mismatch (" + range.start + ',' + rangeStart + ')';
-                assert range.end == rangeEnd : "range/gap end mismatch (" + range.end + ',' + rangeEnd + ')';
-
+            for (SparseFileTracker.Gap gap : gaps) {
                 try {
                 try {
                     ensureOpen();
                     ensureOpen();
-                    onRangeMissing.accept(rangeStart, rangeEnd);
-                    range.onResponse(null);
+                    onRangeMissing.accept(gap.start, gap.end);
+                    gap.onResponse(null);
                 } catch (Exception e) {
                 } catch (Exception e) {
-                    range.onFailure(e);
+                    gap.onFailure(e);
                 }
                 }
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {

+ 97 - 10
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java

@@ -14,6 +14,7 @@ import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexInput;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.Channels;
 import org.elasticsearch.common.io.Channels;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
 import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
 import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
@@ -28,14 +29,24 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel;
 import java.util.Locale;
 import java.util.Locale;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
 
 
 public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput {
 public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput {
 
 
+    /**
+     * Specific IOContext used for prewarming the cache. This context allows to write
+     * a complete part of the {@link #fileInfo} at once in the cache and should not be
+     * used for anything else than what the {@link #prefetchPart(int)} method does.
+     */
+    public static final IOContext CACHE_WARMING_CONTEXT = new IOContext();
+
     private static final Logger logger = LogManager.getLogger(CachedBlobContainerIndexInput.class);
     private static final Logger logger = LogManager.getLogger(CachedBlobContainerIndexInput.class);
     private static final int COPY_BUFFER_SIZE = 8192;
     private static final int COPY_BUFFER_SIZE = 8192;
 
 
     private final SearchableSnapshotDirectory directory;
     private final SearchableSnapshotDirectory directory;
     private final CacheFileReference cacheFileReference;
     private final CacheFileReference cacheFileReference;
+    private final int defaultRangeSize;
 
 
     // last read position is kept around in order to detect (non)contiguous reads for stats
     // last read position is kept around in order to detect (non)contiguous reads for stats
     private long lastReadPosition;
     private long lastReadPosition;
@@ -46,7 +57,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
         SearchableSnapshotDirectory directory,
         SearchableSnapshotDirectory directory,
         FileInfo fileInfo,
         FileInfo fileInfo,
         IOContext context,
         IOContext context,
-        IndexInputStats stats
+        IndexInputStats stats,
+        int rangeSize
     ) {
     ) {
         this(
         this(
             "CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")",
             "CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")",
@@ -56,7 +68,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
             stats,
             stats,
             0L,
             0L,
             fileInfo.length(),
             fileInfo.length(),
-            new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length())
+            new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()),
+            rangeSize
         );
         );
         stats.incrementOpenCount();
         stats.incrementOpenCount();
     }
     }
@@ -69,13 +82,15 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
         IndexInputStats stats,
         IndexInputStats stats,
         long offset,
         long offset,
         long length,
         long length,
-        CacheFileReference cacheFileReference
+        CacheFileReference cacheFileReference,
+        int rangeSize
     ) {
     ) {
         super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length);
         super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length);
         this.directory = directory;
         this.directory = directory;
         this.cacheFileReference = cacheFileReference;
         this.cacheFileReference = cacheFileReference;
         this.lastReadPosition = this.offset;
         this.lastReadPosition = this.offset;
         this.lastSeekPosition = this.offset;
         this.lastSeekPosition = this.offset;
+        this.defaultRangeSize = rangeSize;
     }
     }
 
 
     @Override
     @Override
@@ -85,8 +100,35 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
         }
         }
     }
     }
 
 
+    private void ensureContext(Predicate<IOContext> predicate) throws IOException {
+        if (predicate.test(context) == false) {
+            assert false : "this method should not be used with this context " + context;
+            throw new IOException("Cannot read the index input using context [context=" + context + ", input=" + this + ']');
+        }
+    }
+
+    private long getDefaultRangeSize() {
+        return (context != CACHE_WARMING_CONTEXT) ? defaultRangeSize : fileInfo.partSize().getBytes();
+    }
+
+    private Tuple<Long, Long> computeRange(long position) {
+        final long rangeSize = getDefaultRangeSize();
+        long start = (position / rangeSize) * rangeSize;
+        long end = Math.min(start + rangeSize, fileInfo.length());
+        return Tuple.tuple(start, end);
+    }
+
+    private CacheFile getCacheFileSafe() throws Exception {
+        final CacheFile cacheFile = cacheFileReference.get();
+        if (cacheFile == null) {
+            throw new AlreadyClosedException("Failed to acquire a non-evicted cache file");
+        }
+        return cacheFile;
+    }
+
     @Override
     @Override
     protected void readInternal(final byte[] buffer, final int offset, final int length) throws IOException {
     protected void readInternal(final byte[] buffer, final int offset, final int length) throws IOException {
+        ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT);
         final long position = getFilePointer() + this.offset;
         final long position = getFilePointer() + this.offset;
 
 
         int totalBytesRead = 0;
         int totalBytesRead = 0;
@@ -97,14 +139,12 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
 
 
             int bytesRead = 0;
             int bytesRead = 0;
             try {
             try {
-                final CacheFile cacheFile = cacheFileReference.get();
-                if (cacheFile == null) {
-                    throw new AlreadyClosedException("Failed to acquire a non-evicted cache file");
-                }
-
+                final CacheFile cacheFile = getCacheFileSafe();
                 try (ReleasableLock ignored = cacheFile.fileLock()) {
                 try (ReleasableLock ignored = cacheFile.fileLock()) {
+                    final Tuple<Long, Long> range = computeRange(pos);
                     bytesRead = cacheFile.fetchRange(
                     bytesRead = cacheFile.fetchRange(
-                        pos,
+                        range.v1(),
+                        range.v2(),
                         (start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, buffer, off, len),
                         (start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, buffer, off, len),
                         (start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)
                         (start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)
                     ).get();
                     ).get();
@@ -131,6 +171,50 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
         lastSeekPosition = lastReadPosition;
         lastSeekPosition = lastReadPosition;
     }
     }
 
 
+    /**
+     * Prefetches a complete part and writes it in cache. This method is used to prewarm the cache.
+     */
+    public int prefetchPart(final int part) throws IOException {
+        ensureContext(ctx -> ctx == CACHE_WARMING_CONTEXT);
+        if (part >= fileInfo.numberOfParts()) {
+            throw new IllegalArgumentException("Unexpected part number [" + part + "]");
+        }
+        final Tuple<Long, Long> range = computeRange(IntStream.range(0, part).mapToLong(fileInfo::partBytes).sum());
+        assert assertRangeIsAlignedWithPart(range);
+        try {
+            final CacheFile cacheFile = getCacheFileSafe();
+            try (ReleasableLock ignored = cacheFile.fileLock()) {
+                final int bytesRead = cacheFile.fetchRange(range.v1(), range.v2(), (start, end) -> {
+                    logger.trace("range [{}-{}] of file [{}] is now available in cache", start, end, fileInfo.physicalName());
+                    return Math.toIntExact(end - start);
+                }, (start, end) -> writeCacheFile(cacheFile.getChannel(), start, end)).get();
+
+                assert bytesRead == (range.v2() - range.v1());
+                return bytesRead;
+            }
+        } catch (final Exception e) {
+            throw new IOException("Failed to prefetch file part in cache", e);
+        }
+    }
+
+    /**
+     * Asserts that the range of bytes to warm in cache is aligned with {@link #fileInfo}'s part size.
+     */
+    private boolean assertRangeIsAlignedWithPart(Tuple<Long, Long> range) {
+        if (fileInfo.numberOfParts() == 1L) {
+            final long length = fileInfo.length();
+            assert range.v1() == 0L : "start of range [" + range.v1() + "] is not aligned with zero";
+            assert range.v2() == length : "end of range [" + range.v2() + "] is not aligned with file length [" + length + ']';
+        } else {
+            final long length = fileInfo.partSize().getBytes();
+            assert range.v1() % length == 0L : "start of range [" + range.v1() + "] is not aligned with part start";
+            assert range.v2() % length == 0L || (range.v2() == fileInfo.length()) : "end of range ["
+                + range.v2()
+                + "] is not aligned with part end or with file length";
+        }
+        return true;
+    }
+
     private int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
     private int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
         assert assertFileChannelOpen(fc);
         assert assertFileChannelOpen(fc);
         int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position)));
         int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position)));
@@ -214,7 +298,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
             stats,
             stats,
             this.offset + offset,
             this.offset + offset,
             length,
             length,
-            cacheFileReference
+            cacheFileReference,
+            defaultRangeSize
         );
         );
         slice.isClone = true;
         slice.isClone = true;
         return slice;
         return slice;
@@ -231,6 +316,8 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
             + length()
             + length()
             + ", position="
             + ", position="
             + getFilePointer()
             + getFilePointer()
+            + ", rangeSize="
+            + getDefaultRangeSize()
             + '}';
             + '}';
     }
     }
 
 

+ 32 - 1
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

@@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.env.NodeEnvironment;
@@ -42,6 +43,8 @@ import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestHandler;
 import org.elasticsearch.rest.RestHandler;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.threadpool.ExecutorBuilder;
+import org.elasticsearch.threadpool.ScalingExecutorBuilder;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.watcher.ResourceWatcherService;
 import org.elasticsearch.watcher.ResourceWatcherService;
 import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
 import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
@@ -67,6 +70,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 
 
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED;
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY;
 
 
 /**
 /**
@@ -99,6 +103,11 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
         true,
         true,
         Setting.Property.IndexScope
         Setting.Property.IndexScope
     );
     );
+    public static final Setting<Boolean> SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING = Setting.boolSetting(
+        "index.store.snapshot.cache.prewarm.enabled",
+        false,
+        Setting.Property.IndexScope
+    );
     // The file extensions that are excluded from the cache
     // The file extensions that are excluded from the cache
     public static final Setting<List<String>> SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING = Setting.listSetting(
     public static final Setting<List<String>> SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING = Setting.listSetting(
         "index.store.snapshot.cache.excluded_file_types",
         "index.store.snapshot.cache.excluded_file_types",
@@ -116,6 +125,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
 
 
     private volatile Supplier<RepositoriesService> repositoriesServiceSupplier;
     private volatile Supplier<RepositoriesService> repositoriesServiceSupplier;
     private final SetOnce<CacheService> cacheService = new SetOnce<>();
     private final SetOnce<CacheService> cacheService = new SetOnce<>();
+    private final SetOnce<ThreadPool> threadPool = new SetOnce<>();
     private final Settings settings;
     private final Settings settings;
 
 
     public SearchableSnapshots(final Settings settings) {
     public SearchableSnapshots(final Settings settings) {
@@ -137,6 +147,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
                 SNAPSHOT_SNAPSHOT_ID_SETTING,
                 SNAPSHOT_SNAPSHOT_ID_SETTING,
                 SNAPSHOT_INDEX_ID_SETTING,
                 SNAPSHOT_INDEX_ID_SETTING,
                 SNAPSHOT_CACHE_ENABLED_SETTING,
                 SNAPSHOT_CACHE_ENABLED_SETTING,
+                SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING,
                 SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING,
                 SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING,
                 SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING,
                 SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING,
                 CacheService.SNAPSHOT_CACHE_SIZE_SETTING,
                 CacheService.SNAPSHOT_CACHE_SIZE_SETTING,
@@ -165,6 +176,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
             final CacheService cacheService = new CacheService(settings);
             final CacheService cacheService = new CacheService(settings);
             this.cacheService.set(cacheService);
             this.cacheService.set(cacheService);
             this.repositoriesServiceSupplier = repositoriesServiceSupplier;
             this.repositoriesServiceSupplier = repositoriesServiceSupplier;
+            this.threadPool.set(threadPool);
             return List.of(cacheService);
             return List.of(cacheService);
         } else {
         } else {
             this.repositoriesServiceSupplier = () -> {
             this.repositoriesServiceSupplier = () -> {
@@ -190,7 +202,9 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
                 assert repositories != null;
                 assert repositories != null;
                 final CacheService cache = cacheService.get();
                 final CacheService cache = cacheService.get();
                 assert cache != null;
                 assert cache != null;
-                return SearchableSnapshotDirectory.create(repositories, cache, indexSettings, shardPath, System::nanoTime);
+                final ThreadPool threadPool = this.threadPool.get();
+                assert threadPool != null;
+                return SearchableSnapshotDirectory.create(repositories, cache, indexSettings, shardPath, System::nanoTime, threadPool);
             });
             });
         } else {
         } else {
             return Map.of();
             return Map.of();
@@ -252,4 +266,21 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
         }
         }
     }
     }
 
 
+    public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
+        if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED) {
+            return List.of(executorBuilder());
+        } else {
+            return List.of();
+        }
+    }
+
+    public static ExecutorBuilder<?> executorBuilder() {
+        return new ScalingExecutorBuilder(
+            SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME,
+            0,
+            32,
+            TimeValue.timeValueSeconds(30L),
+            "xpack.searchable_snapshots.thread_pool"
+        );
+    }
 }
 }

+ 1 - 1
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java

@@ -112,7 +112,7 @@ public class CacheService extends AbstractLifecycleComponent {
             final Path path = cacheDir.resolve(uuid);
             final Path path = cacheDir.resolve(uuid);
             assert Files.notExists(path) : "cache file already exists " + path;
             assert Files.notExists(path) : "cache file already exists " + path;
 
 
-            return new CacheFile(key.toString(), fileLength, path, getRangeSize());
+            return new CacheFile(key.toString(), fileLength, path);
         });
         });
     }
     }
 
 

+ 7 - 1
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java

@@ -22,6 +22,8 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.F
 import org.elasticsearch.index.store.cache.TestUtils;
 import org.elasticsearch.index.store.cache.TestUtils;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotId;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
 import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
 
 
 import java.io.IOException;
 import java.io.IOException;
@@ -570,6 +572,7 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase
         final ShardId shardId = new ShardId("_name", "_uuid", 0);
         final ShardId shardId = new ShardId("_name", "_uuid", 0);
         final AtomicLong fakeClock = new AtomicLong();
         final AtomicLong fakeClock = new AtomicLong();
         final LongSupplier statsCurrentTimeNanos = () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS);
         final LongSupplier statsCurrentTimeNanos = () -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS);
+        final ThreadPool threadPool = new TestThreadPool(getTestClass().getSimpleName());
 
 
         final Long seekingThreshold = randomBoolean() ? randomLongBetween(1L, fileContent.length) : null;
         final Long seekingThreshold = randomBoolean() ? randomLongBetween(1L, fileContent.length) : null;
 
 
@@ -590,7 +593,8 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase
                 indexSettings,
                 indexSettings,
                 statsCurrentTimeNanos,
                 statsCurrentTimeNanos,
                 cacheService,
                 cacheService,
-                createTempDir()
+                createTempDir(),
+                threadPool
             ) {
             ) {
                 @Override
                 @Override
                 protected IndexInputStats createIndexInputStats(long fileLength) {
                 protected IndexInputStats createIndexInputStats(long fileLength) {
@@ -610,6 +614,8 @@ public class SearchableSnapshotDirectoryStatsTests extends ESIndexInputTestCase
             assertThat("BlobContainer should be loaded", directory.blobContainer(), notNullValue());
             assertThat("BlobContainer should be loaded", directory.blobContainer(), notNullValue());
 
 
             test.apply(fileName, fileContent, directory);
             test.apply(fileName, fileContent, directory);
+        } finally {
+            terminate(threadPool);
         }
         }
     }
     }
 }
 }

+ 18 - 5
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java

@@ -70,6 +70,7 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.IndexSettingsModule;
 import org.elasticsearch.test.IndexSettingsModule;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
 import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
 import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matcher;
 
 
@@ -92,6 +93,7 @@ import java.util.Map;
 
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptyMap;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThan;
@@ -350,7 +352,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
                 writer.commit();
                 writer.commit();
             }
             }
 
 
-            final ThreadPool threadPool = new TestThreadPool(getClass().getSimpleName());
+            final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
             releasables.add(() -> terminate(threadPool));
             releasables.add(() -> terminate(threadPool));
 
 
             final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
             final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
@@ -439,10 +441,14 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
                         snapshotId,
                         snapshotId,
                         indexId,
                         indexId,
                         shardId,
                         shardId,
-                        Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), randomBoolean()).build(),
+                        Settings.builder()
+                            .put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), randomBoolean())
+                            .put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), randomBoolean())
+                            .build(),
                         () -> 0L,
                         () -> 0L,
                         cacheService,
                         cacheService,
-                        cacheDir
+                        cacheDir,
+                        threadPool
                     )
                     )
                 ) {
                 ) {
                     final boolean loaded = snapshotDirectory.loadSnapshot();
                     final boolean loaded = snapshotDirectory.loadSnapshot();
@@ -513,6 +519,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
             final ShardId shardId = new ShardId(new Index("_name", "_id"), 0);
             final ShardId shardId = new ShardId(new Index("_name", "_id"), 0);
 
 
             final Path cacheDir = createTempDir();
             final Path cacheDir = createTempDir();
+            final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
             try (
             try (
                 SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(
                 SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory(
                     () -> blobContainer,
                     () -> blobContainer,
@@ -520,10 +527,14 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
                     snapshotId,
                     snapshotId,
                     indexId,
                     indexId,
                     shardId,
                     shardId,
-                    Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(),
+                    Settings.builder()
+                        .put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true)
+                        .put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), randomBoolean())
+                        .build(),
                     () -> 0L,
                     () -> 0L,
                     cacheService,
                     cacheService,
-                    cacheDir
+                    cacheDir,
+                    threadPool
                 )
                 )
             ) {
             ) {
 
 
@@ -553,6 +564,8 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
                         assertListOfFiles(cacheDir, equalTo(0), equalTo(0L));
                         assertListOfFiles(cacheDir, equalTo(0), equalTo(0L));
                     }
                     }
                 }
                 }
+            } finally {
+                terminate(threadPool);
             }
             }
         }
         }
     }
     }

+ 4 - 4
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java

@@ -6,8 +6,8 @@
 package org.elasticsearch.index.store.cache;
 package org.elasticsearch.index.store.cache;
 
 
 import org.apache.lucene.util.SetOnce;
 import org.apache.lucene.util.SetOnce;
-import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.index.store.cache.CacheFile.EvictionListener;
 import org.elasticsearch.index.store.cache.CacheFile.EvictionListener;
+import org.elasticsearch.test.ESTestCase;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel;
@@ -27,7 +27,7 @@ public class CacheFileTests extends ESTestCase {
 
 
     public void testAcquireAndRelease() throws Exception {
     public void testAcquireAndRelease() throws Exception {
         final Path file = createTempDir().resolve("file.cache");
         final Path file = createTempDir().resolve("file.cache");
-        final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, randomIntBetween(1, 100));
+        final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file);
 
 
         assertThat("Cache file is not acquired: no channel exists", cacheFile.getChannel(), nullValue());
         assertThat("Cache file is not acquired: no channel exists", cacheFile.getChannel(), nullValue());
         assertThat("Cache file is not acquired: file does not exist", Files.exists(file), is(false));
         assertThat("Cache file is not acquired: file does not exist", Files.exists(file), is(false));
@@ -70,7 +70,7 @@ public class CacheFileTests extends ESTestCase {
 
 
     public void testCacheFileNotAcquired() throws IOException {
     public void testCacheFileNotAcquired() throws IOException {
         final Path file = createTempDir().resolve("file.cache");
         final Path file = createTempDir().resolve("file.cache");
-        final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, randomIntBetween(1, 100));
+        final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file);
 
 
         assertThat(Files.exists(file), is(false));
         assertThat(Files.exists(file), is(false));
         assertThat(cacheFile.getChannel(), nullValue());
         assertThat(cacheFile.getChannel(), nullValue());
@@ -94,7 +94,7 @@ public class CacheFileTests extends ESTestCase {
 
 
     public void testDeleteOnCloseAfterLastRelease() throws Exception {
     public void testDeleteOnCloseAfterLastRelease() throws Exception {
         final Path file = createTempDir().resolve("file.cache");
         final Path file = createTempDir().resolve("file.cache");
-        final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, randomIntBetween(1, 100));
+        final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file);
 
 
         final List<TestEvictionListener> acquiredListeners = new ArrayList<>();
         final List<TestEvictionListener> acquiredListeners = new ArrayList<>();
         for (int i = 0; i < randomIntBetween(1, 20); i++) {
         for (int i = 0; i < randomIntBetween(1, 20); i++) {

+ 311 - 0
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachePreWarmingTests.java

@@ -0,0 +1,311 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.index.store.cache;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.Directory;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.RepositoryMetadata;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.blobstore.BlobContainer;
+import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.lucene.Lucene;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
+import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
+import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
+import org.elasticsearch.index.store.SearchableSnapshotDirectory;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
+import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
+import org.elasticsearch.repositories.fs.FsRepository;
+import org.elasticsearch.snapshots.SnapshotId;
+import org.elasticsearch.snapshots.mockstore.BlobContainerWrapper;
+import org.elasticsearch.test.DummyShardLock;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.IndexSettingsModule;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
+import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyMap;
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
+import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class CachePreWarmingTests extends ESTestCase {
+
+    public void testCachePreWarming() throws Exception {
+        final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
+            "_index",
+            Settings.builder()
+                .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))
+                .put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
+                .build()
+        );
+        final ShardId shardId = new ShardId(indexSettings.getIndex(), randomIntBetween(0, 10));
+        final List<Releasable> releasables = new ArrayList<>();
+
+        try (Directory directory = newDirectory()) {
+            final IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
+            try (IndexWriter writer = new IndexWriter(directory, indexWriterConfig)) {
+                final int nbDocs = scaledRandomIntBetween(0, 1_000);
+                final List<String> words = List.of("the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog");
+                for (int i = 0; i < nbDocs; i++) {
+                    final Document doc = new Document();
+                    doc.add(new StringField("id", "" + i, Field.Store.YES));
+                    String text = String.join(" ", randomSubsetOf(randomIntBetween(1, words.size()), words));
+                    doc.add(new TextField("text", text, Field.Store.YES));
+                    doc.add(new NumericDocValuesField("rank", i));
+                    writer.addDocument(doc);
+                }
+                if (randomBoolean()) {
+                    writer.flush();
+                }
+                if (randomBoolean()) {
+                    writer.forceMerge(1, true);
+                }
+                final Map<String, String> userData = new HashMap<>(2);
+                userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, "0");
+                userData.put(Translog.TRANSLOG_UUID_KEY, UUIDs.randomBase64UUID(random()));
+                writer.setLiveCommitData(userData.entrySet());
+                writer.commit();
+            }
+
+            final ThreadPool threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilder());
+            releasables.add(() -> terminate(threadPool));
+
+            final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
+            store.incRef();
+            releasables.add(store::decRef);
+            try {
+                final SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
+                final IndexCommit indexCommit = Lucene.getIndexCommit(segmentInfos, store.directory());
+
+                Path repositoryPath = createTempDir();
+                Settings.Builder repositorySettings = Settings.builder().put("location", repositoryPath);
+                boolean compress = randomBoolean();
+                if (compress) {
+                    repositorySettings.put("compress", randomBoolean());
+                }
+                if (randomBoolean()) {
+                    repositorySettings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES);
+                }
+
+                final String repositoryName = randomAlphaOfLength(10);
+                final RepositoryMetadata repositoryMetadata = new RepositoryMetadata(
+                    repositoryName,
+                    FsRepository.TYPE,
+                    repositorySettings.build()
+                );
+
+                final BlobStoreRepository repository = new FsRepository(
+                    repositoryMetadata,
+                    new Environment(
+                        Settings.builder()
+                            .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
+                            .put(Environment.PATH_REPO_SETTING.getKey(), repositoryPath.toAbsolutePath())
+                            .putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths())
+                            .build(),
+                        null
+                    ),
+                    NamedXContentRegistry.EMPTY,
+                    BlobStoreTestUtil.mockClusterService(repositoryMetadata)
+                ) {
+
+                    @Override
+                    protected void assertSnapshotOrGenericThread() {
+                        // eliminate thread name check as we create repo manually on test/main threads
+                    }
+                };
+                repository.start();
+                releasables.add(repository::stop);
+
+                final SnapshotId snapshotId = new SnapshotId("_snapshot", UUIDs.randomBase64UUID(random()));
+                final IndexId indexId = new IndexId(indexSettings.getIndex().getName(), UUIDs.randomBase64UUID(random()));
+
+                final PlainActionFuture<String> future = PlainActionFuture.newFuture();
+                threadPool.generic().submit(() -> {
+                    IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
+                    repository.snapshotShard(
+                        store,
+                        null,
+                        snapshotId,
+                        indexId,
+                        indexCommit,
+                        null,
+                        snapshotStatus,
+                        Version.CURRENT,
+                        emptyMap(),
+                        future
+                    );
+                    future.actionGet();
+                });
+                future.actionGet();
+
+                final Path cacheDir = createTempDir();
+                final CacheService cacheService = new CacheService(Settings.EMPTY);
+                releasables.add(cacheService);
+                cacheService.start();
+
+                final List<String> excludedFromCache = randomSubsetOf(Arrays.asList("fdt", "fdx", "nvd", "dvd", "tip", "cfs", "dim"));
+
+                final Settings restoredIndexSettings = Settings.builder()
+                    .put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true)
+                    .put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), true)
+                    .putList(SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.getKey(), excludedFromCache)
+                    .build();
+
+                final BlobContainer blobContainer = repository.shardContainer(indexId, shardId.id());
+                final BlobStoreIndexShardSnapshot snapshot = repository.loadShardSnapshot(blobContainer, snapshotId);
+
+                final List<FileInfo> expectedPrewarmedBlobs = snapshot.indexFiles()
+                    .stream()
+                    .filter(fileInfo -> fileInfo.metadata().hashEqualsContents() == false)
+                    .filter(fileInfo -> excludedFromCache.contains(IndexFileNames.getExtension(fileInfo.physicalName())) == false)
+                    .collect(Collectors.toList());
+
+                final FilterBlobContainer filterBlobContainer = new FilterBlobContainer(blobContainer);
+                try (
+                    SearchableSnapshotDirectory snapshotDirectory = new SearchableSnapshotDirectory(
+                        () -> filterBlobContainer,
+                        () -> snapshot,
+                        snapshotId,
+                        indexId,
+                        shardId,
+                        restoredIndexSettings,
+                        () -> 0L,
+                        cacheService,
+                        cacheDir,
+                        threadPool
+                    )
+                ) {
+                    assertThat(filterBlobContainer.totalFilesRead(), equalTo(0L));
+                    assertThat(filterBlobContainer.totalBytesRead(), equalTo(0L));
+
+                    final boolean loaded = snapshotDirectory.loadSnapshot();
+                    assertThat("Failed to load snapshot", loaded, is(true));
+
+                    final ExecutorService executor = threadPool.executor(SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME);
+                    executor.shutdown();
+                    executor.awaitTermination(30L, TimeUnit.SECONDS);
+
+                    assertThat(
+                        filterBlobContainer.totalFilesRead(),
+                        equalTo(expectedPrewarmedBlobs.stream().mapToLong(FileInfo::numberOfParts).sum())
+                    );
+                    assertThat(
+                        filterBlobContainer.totalBytesRead(),
+                        equalTo(expectedPrewarmedBlobs.stream().mapToLong(FileInfo::length).sum())
+                    );
+
+                    for (FileInfo expectedPrewarmedBlob : expectedPrewarmedBlobs) {
+                        for (int part = 0; part < expectedPrewarmedBlob.numberOfParts(); part++) {
+                            String partName = expectedPrewarmedBlob.partName(part);
+                            assertThat(filterBlobContainer.totalBytesRead(partName), equalTo(expectedPrewarmedBlob.partBytes(part)));
+                        }
+                    }
+                }
+            } finally {
+                Releasables.close(releasables);
+            }
+        }
+    }
+
+    private static class FilterBlobContainer extends BlobContainerWrapper {
+
+        private final Map<String, Long> files = new ConcurrentHashMap<>();
+
+        FilterBlobContainer(BlobContainer delegate) {
+            super(delegate);
+        }
+
+        public long totalFilesRead() {
+            return files.size();
+        }
+
+        public long totalBytesRead() {
+            return files.values().stream().mapToLong(bytesRead -> bytesRead).sum();
+        }
+
+        @Nullable
+        public Long totalBytesRead(String name) {
+            return files.getOrDefault(name, null);
+        }
+
+        @Override
+        public InputStream readBlob(String blobName, long position, long length) throws IOException {
+            return new FilterInputStream(super.readBlob(blobName, position, length)) {
+                long bytesRead = 0L;
+
+                @Override
+                public int read() throws IOException {
+                    final int result = in.read();
+                    if (result == -1) {
+                        return result;
+                    }
+                    bytesRead += 1L;
+                    return result;
+                }
+
+                @Override
+                public int read(byte[] b, int offset, int len) throws IOException {
+                    final int result = in.read(b, offset, len);
+                    if (result == -1) {
+                        return result;
+                    }
+                    bytesRead += len;
+                    return result;
+                }
+
+                @Override
+                public void close() throws IOException {
+                    files.put(blobName, bytesRead);
+                    super.close();
+                }
+            };
+        }
+    }
+}

+ 4 - 2
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java

@@ -87,7 +87,8 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
                         Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(),
                         Settings.builder().put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true).build(),
                         () -> 0L,
                         () -> 0L,
                         cacheService,
                         cacheService,
-                        cacheDir
+                        cacheDir,
+                        null
                     )
                     )
                 ) {
                 ) {
                     final boolean loaded = directory.loadSnapshot();
                     final boolean loaded = directory.loadSnapshot();
@@ -156,7 +157,8 @@ public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase {
                     Settings.EMPTY,
                     Settings.EMPTY,
                     () -> 0L,
                     () -> 0L,
                     cacheService,
                     cacheService,
-                    cacheDir
+                    cacheDir,
+                    null
                 )
                 )
             ) {
             ) {
                 final boolean loaded = searchableSnapshotDirectory.loadSnapshot();
                 final boolean loaded = searchableSnapshotDirectory.loadSnapshot();

+ 3 - 0
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java

@@ -126,6 +126,9 @@ public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegT
         Settings.Builder indexSettingsBuilder = Settings.builder()
         Settings.Builder indexSettingsBuilder = Settings.builder()
             .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), cacheEnabled)
             .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), cacheEnabled)
             .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString());
             .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString());
+        if (cacheEnabled) {
+            indexSettingsBuilder.put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), randomBoolean());
+        }
         final List<String> nonCachedExtensions;
         final List<String> nonCachedExtensions;
         if (randomBoolean()) {
         if (randomBoolean()) {
             nonCachedExtensions = randomSubsetOf(Arrays.asList("fdt", "fdx", "nvd", "dvd", "tip", "cfs", "dim"));
             nonCachedExtensions = randomSubsetOf(Arrays.asList("fdt", "fdx", "nvd", "dvd", "tip", "cfs", "dim"));